Spring Boot 整合 Apache Flink 教程

精心整理了最新的面试资料和简历模板,有需要的可以自行获取

点击前往百度网盘获取
点击前往夸克网盘获取


Spring Boot 整合 Apache Flink 教程

一、背景与目标

Apache Flink 是一个高性能的分布式流处理框架,而Spring Boot提供了快速构建企业级应用的能力。整合二者可实现:

  1. 利用Spring Boot的依赖注入、配置管理等功能简化Flink作业开发
  2. 构建完整的微服务架构,将流处理嵌入Spring生态
  3. 实现动态作业提交与管理

二、环境准备

  • JDK 17+
  • Maven 3.8+
  • Spring Boot 3.1.5
  • Flink 1.17.2

三、创建项目 & 添加依赖

1. 创建Spring Boot项目

使用Spring Initializr生成基础项目,选择:

  • Maven
  • Spring Web(可选,用于创建REST接口)

2. 添加Flink依赖

<!-- pom.xml -->
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Flink核心依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.2</version><scope>provided</scope></dependency><!-- 本地执行时需添加 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime</artifactId><version>1.17.2</version><scope>test</scope></dependency>
</dependencies>

四、基础整合示例

1. 编写Flink流处理作业

// src/main/java/com/example/demo/flink/WordCountJob.javaimport org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountJob {public static void execute() throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.fromElements("Spring Boot整合Flink","Flink实时流处理","Spring生态集成");DataStream<WordCount> counts = text.flatMap(new FlatMapFunction<String, WordCount>() {@Overridepublic void flatMap(String value, Collector<WordCount> out) {for (String word : value.split("\\s")) {out.collect(new WordCount(word, 1L));}}}).keyBy(value -> value.word).sum("count");counts.print();env.execute("Spring Boot Flink Job");}public static class WordCount {public String word;public long count;public WordCount() {}public WordCount(String word, long count) {this.word = word;this.count = count;}@Overridepublic String toString() {return word + " : " + count;}}
}

2. 在Spring Boot中启动作业

// src/main/java/com/example/demo/DemoApplication.java@SpringBootApplication
public class DemoApplication implements CommandLineRunner {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}@Overridepublic void run(String... args) throws Exception {WordCountJob.execute(); // 启动Flink作业}
}

五、进阶整合 - 通过REST API动态提交作业

1. 创建Job提交服务

// src/main/java/com/example/demo/service/FlinkJobService.java@Service
public class FlinkJobService {public String submitWordCountJob(List<String> inputLines) {try {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.fromCollection(inputLines);// ...(同上WordCount逻辑)JobExecutionResult result = env.execute();return "JobID: " + result.getJobID();} catch (Exception e) {return "Job Failed: " + e.getMessage();}}
}

2. 创建REST控制器

// src/main/java/com/example/demo/controller/JobController.java@RestController
@RequestMapping("/jobs")
public class JobController {@Autowiredprivate FlinkJobService flinkJobService;@PostMapping("/wordcount")public String submitWordCount(@RequestBody List<String> inputs) {return flinkJobService.submitWordCountJob(inputs);}
}

六、关键配置说明

1. application.properties

# 设置Flink本地执行环境
spring.flink.local.enabled=true
spring.flink.job.name=SpringBootFlinkJob# 调整并行度(根据CPU核心数)
spring.flink.parallelism=4

2. 解决依赖冲突

在pom.xml中排除冲突依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.17.2</version><exclusions><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions>
</dependency>

七、运行与验证

  1. 启动Spring Boot应用:
mvn spring-boot:run
  1. 调用API提交作业:
curl -X POST -H "Content-Type: application/json" \
-d '["Hello Flink", "Spring Boot Integration"]' \
http://localhost:8080/jobs/wordcount
  1. 查看控制台输出:
Flink> Spring : 1
Flink> Boot : 1
Flink> Integration : 1
...

八、生产环境注意事项

  1. 集群部署:将打包后的jar提交到Flink集群

    flink run -c com.example.demo.DemoApplication your-application.jar
    
  2. 状态管理:集成Flink State Backend(如RocksDB)

  3. 监控集成:通过Micrometer接入Spring Boot Actuator

  4. 资源隔离:使用YarnKubernetes部署模式


九、完整项目结构

src/
├── main/
│   ├── java/
│   │   ├── com/example/demo/
│   │   │   ├── DemoApplication.java
│   │   │   ├── flink/
│   │   │   │   └── WordCountJob.java
│   │   │   ├── controller/
│   │   │   ├── service/
│   ├── resources/
│   │   └── application.properties
pom.xml

通过以上步骤,即可实现Spring Boot与Apache Flink的深度整合。这种架构特别适合需要将实时流处理能力嵌入微服务体系的场景,如实时风控系统、IoT数据处理平台等。后续可扩展集成Kafka、HBase等大数据组件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/38821.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT三 自定义控件

一 自定义控件 现在的需求是这样&#xff1a; 假设我们要在QWidget 上做定制&#xff0c;这个定制包括了关于 一些事件处理&#xff0c;意味着要重写QWidget的一些代码&#xff0c;这是不实际的&#xff0c;因此我们需要自己写一个MyWidget继承QWidget&#xff0c;然后再MyWi…

【C++ 进阶】语句:从基础到实践

目录 一、输入输出体系的范式革命 1.1 C语言的格式化 1.2 C的流抽象革命 二、字符串处理的抽象跃迁 2.1 C语言的字符指针 2.2 C的string类革命 三、结构体到类的类型系统进化 3.1 C语言的结构体局限 3.2 C类的革命性演进 四、基础控制语句差异 4.1 条件语句&#xf…

C语言操作符

&#x1f31f; 各位看官好&#xff0c;我是maomi_9526&#xff01; &#x1f30d; 种一棵树最好是十年前&#xff0c;其次是现在&#xff01; &#x1f680; 今天来学习C语言的相关知识。 &#x1f44d; 如果觉得这篇文章有帮助&#xff0c;欢迎您一键三连&#xff0c;分享给更…

PostgreSQL:语言基础与数据库操作

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…

KMP算法

KMP算法 为什么叫做KMP呢。 因为是由这三位学者发明的&#xff1a;Knuth&#xff0c;Morris和Pratt&#xff0c;所以取了三位学者名字的首字母。所以叫做KMP next数组就是一个前缀表&#xff08;prefix table&#xff09;。 前缀表是用来回退的&#xff0c;它记录了模式串与…

3D点云数据处理中的聚类算法总结

1.欧式聚类&#xff1a; 基于点的空间距离&#xff08;欧几里得距离&#xff09;来分割点云&#xff0c;将距离较近的点归为同一簇。 欧式聚类需要的参数&#xff1a;邻域半径R,簇的最小点阈值minPts&#xff0c;最大点数阈值maxPts。 实现效率&#xff1a; O(n * log n) 实现…

WRC世界机器人大会-2024年展商汇总

2024世界机器人大会 时间&#xff1a;2024年8月21日至25日 地点&#xff1a;北京经济技术开发区北人亦创国际会展中心 大会主题&#xff1a;共育新质生产力&#xff0c;共享智能新未来 2024世界机器人博览会亮点纷呈&#xff0c;20余款人形机器人整机将亮相博览会&#xff…

拉取镜像,推送到阿里云镜像仓库

需求背景&#xff1a;在学习k8s&#xff0c;虚拟机无法正常拉取 wangyanglinux/tools:busybox 镜像。 解决办法&#xff1a;将墙外镜像拉到国内&#xff08;阿里云&#xff09;再使用 准备工作需要创建对应的镜像仓库&#xff0c;然后再进行推送 1. 拉取镜像 docker pull …

DeepSeek和Kimi在Neo4j中的表现

以下是2个最近爆火的人工智能工具&#xff0c; DeepSeek:DeepSeek Kimi: Kimi - 会推理解析&#xff0c;能深度思考的AI助手 1、提示词&#xff1a; 你能帮我生成一个知识图谱吗&#xff0c;等一下我会给你一篇文章&#xff0c;帮我从内容中提取关键要素&#xff0c;然后以N…

哈尔滨工业大学DeepSeek公开课人工智能:大模型原理 技术与应用-从GPT到DeepSeek|附视频下载方法

导 读INTRODUCTION 今天继续哈尔滨工业大学车万翔教授带来了一场主题为“DeepSeek 技术前沿与应用”的报告。 本报告深入探讨了大语言模型在自然语言处理&#xff08;NLP&#xff09;领域的核心地位及其发展历程&#xff0c;从基础概念出发&#xff0c;延伸至语言模型在机器翻…

redis解决缓存穿透/击穿/雪崩

文章目录 1.缓存穿透1.1 概念1.2 解决方案1.2.1 缓存空对象1.2.2 布隆过滤 1.2 店铺查询使用缓存穿透解决方案1.2.1 流程 2.缓存雪崩2.1 什么是缓存雪崩&#xff1f;2.2 雪崩解决方案 3.缓存击穿3.1 什么是缓存击穿&#xff1f;3.2解决方案3.2.1 基于互斥锁解决缓存击穿问题&am…

不连续平面提取

不连续平面提取 提取流程 #mermaid-svg-Y87uP8WsVRmPYriG {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-Y87uP8WsVRmPYriG .error-icon{fill:#552222;}#mermaid-svg-Y87uP8WsVRmPYriG .error-text{fill:#552222;s…

大语言模型-2.2/3-主流模型架构与新型架构

简介 本博客内容是《大语言模型》一书的读书笔记&#xff0c;该书是中国人民大学高瓴人工智能学院赵鑫教授团队出品&#xff0c;覆盖大语言模型训练与使用的全流程&#xff0c;从预训练到微调与对齐&#xff0c;从使用技术到评测应用&#xff0c;帮助学员全面掌握大语言模型的…

数据库操作练习

一.向heros表中新增一列信息&#xff0c;添加一些约束&#xff0c;并尝试查询一些信息 //向表中添加一列age信息 alter table heros add column age int;//id列添加主键约束&#xff0c;设置自增 alter table heros modify column id int auto_increment primary key;//name列…

CTF【WEB】学习笔记1号刊

Kali的小工具箱 curl www.xxx.com&#xff1a;查看服务器响应返回的信息 curl -I www.xxx.com:查看响应的文件头 一、cmd执行命令 ipconfig&#xff1a;ip地址配置等&#xff1b; 二、 Kali操作 1.sudo su&#xff1b; 2.msfconsole 3.search ms17_010 永恒之蓝&#xff…

在 SaaS 应用上构建 BI 能力的实战之路

SaaS 产品在持续运营过程中积累了大量数据&#xff0c;这些数据不仅是数字的记录&#xff0c;更是洞察市场趋势、优化产品功能、提升用户体验的宝贵资源。 因此&#xff0c;大部分的 SaaS 产品在发展到一定阶段后&#xff0c;都会开始构建自己的报表模块或分析模块&#xff0c;…

gonet开源游戏服务器环境配置

1.mysql搭建 搜索mysql-server apt安装包名 sudo apt search mysql-server 安装mysql-server sudo apt-get install mysql-server 安装完成后会&#xff0c;启动mysql服务及创建系统服务 查看服务状态 systemctl status mysql.service 使用超级权限登陆mysql sudo mysql 授…

STM32基础篇(五)------TIM定时器比较输出

简介 定时器的类型 在《STM32F10xxx参考手册&#xff08;中文&#xff09;.pdf》中可以看到下面三个章节 因此可以得到 高级定时器含有通用定时器的所有功能&#xff0c;通用定时器含有基本定时器的所有功能&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;…

基于STM32的两路电压测量仿真设计Proteus仿真+程序设计+设计报告+讲解视频

基于STM32两路电压测量仿真设计(Proteus仿真程序设计设计报告讲解视频&#xff09; 仿真图Proteus 8.9 程序编译器&#xff1a;keil 5 编程语言&#xff1a;C语言 设计编号&#xff1a;C0106 1.主要功能 基于STM32单片机设计一个双路电压检测器 1.系统可以测量两路输入电…

210、【图论】课程表(Python)

题目 思路 这道题本质上是一个拓扑排序。每次先统计每个点的入度个数、然后再统计点与点之间的邻接关系&#xff0c;找到入度为0的点作为起始遍历点。之后每遍历到这个点之后&#xff0c;就把这个点后续的邻接关系边的点入度减去一。当某个点入度为0时&#xff0c;继续被加入其…