Flink 应用

Flink 应用

  • Flink 应用的特点
  • Flink 应用的组成
    • 数据源(Source)
    • 数据流处理逻辑(Processing Logic)
    • 数据目的地(Sink)
    • 运行时配置(Runtime Configuration)
    • 状态(State)
    • 连接器(Connectors)
    • 部署模式(Deployment Mode)
    • 监控与调试(Monitoring & Debugging)
    • 检查点与快照(Checkpoint & Savepoint)
  • Flink 应用示例
    • 前置条件
    • 依赖库
    • 代码实现
      • 代码主流程
      • 自定义 SQS Source
      • 自定义 DynamoDB Sink
    • 运行与调试
    • 扩展

《Big Data 流处理框架 Flink》有介绍开源的分布式流处理框架 Apache Flink 基本特点。Flink 应用是指使用 Apache Flink 编写并运行的数据处理程序,Flink 应用主要用于处理大规模的数据流,执行复杂的数据转换、聚合和分析任务。

Flink 应用的特点

  • 实时流处理 (Stream Processing):
    Flink 以事件驱动的方式处理数据流,支持毫秒级延迟。
    可处理无界数据流(持续不断到达的实时数据)。
  • 批量处理 (Batch Processing):
    Flink 也可以处理有限的数据集,将其视为特殊的流。
  • 分布式和高吞吐:
    Flink 应用以分布式方式运行,能处理 PB 级别的数据。
    支持高并发和低延迟。
  • 状态管理:
    Flink 提供强大的状态管理功能,支持有状态流计算。
    支持精确一次(exactly-once)的处理语义。
  • 容错机制:
    通过检查点(Checkpoint)和保存点(Savepoint)实现应用的容错和恢复。
  • 多种数据源与接收器支持:
    支持 Kafka、HDFS、Elasticsearch、JDBC、S3 等多种数据源和目标。

Flink 应用的组成

Flink 应用通常由以下几个关键部分组成,构建和运行一个 Flink 应用时,这些组件共同协作:

数据源(Source)

数据源是 Flink 应用的起点,用于从外部系统中读取数据,例如:
消息队列:Amazon SQS、Kafka、RabbitMQ 等
数据库:MySQL、PostgreSQL 等
文件系统:HDFS、S3、本地文件系统
实时数据流:Socket、API 等
Flink 提供了多种内置的连接器和自定义数据源接口,可以扩展支持其他数据源。

数据流处理逻辑(Processing Logic)

核心处理逻辑决定了如何对数据进行变换和计算,常见的操作包括:

  • 转换(Transformation):
    map:一对一的映射操作。
    flatMap:一对多的映射操作。
    filter:筛选符合条件的数据。
    keyBy:对流进行分区(按键分组)。
  • 窗口(Windowing):
    定义基于时间或事件的窗口(滚动窗口、滑动窗口、会话窗口)。
  • 聚合(Aggregation):
    对数据进行统计计算,例如计数、求和、平均等。
  • 状态管理(State Management):
    维护有状态的计算,用于跟踪中间结果或上下文。

数据目的地(Sink)

数据目的地是 Flink 应用的终点,用于将处理结果输出到目标系统,例如:
消息队列:Kafka、RabbitMQ
数据库:MySQL、Elasticsearch、Amazon DynamoDB
存储系统:HDFS、S3、本地文件
实时分析系统:Redis、Cassandra
类似于数据源,Flink 提供了许多内置的 Sink 连接器。

运行时配置(Runtime Configuration)

Flink 应用需要通过配置来控制运行时的行为:
并行度(Parallelism):控制任务在集群中的并发执行程度。
检查点(Checkpointing):开启容错功能,定期保存状态。
重启策略(Restart Strategy):定义失败后应用的重启逻辑。
资源管理:控制内存、CPU 的使用。
任务分布:配置任务的 Slot 和 Operator Chain。

状态(State)

Flink 提供强大的状态管理,支持两种类型:
键控状态(Keyed State):与分区数据相关联,常用于有状态计算。
算子状态(Operator State):与任务实例相关联,常用于处理非分区数据的场景。
状态支持持久化存储(如 RocksDB),并与检查点机制配合实现容错。

连接器(Connectors)

用于与外部系统交互的模块,负责数据的输入和输出,常见的连接器包括:
Kafka、ElasticSearch、HDFS、Cassandra、JDBC 等。

部署模式(Deployment Mode)

Flink 应用可以部署在不同的环境中:
Standalone 模式:Flink 集群独立运行。
YARN 模式:与 Hadoop 集成。
Kubernetes 模式:容器化部署,适合云原生场景。
Local 模式:单机运行,适合开发和调试。

监控与调试(Monitoring & Debugging)

Flink 提供了 Web UI 和 REST API,用于监控任务运行状态。
可以查看任务拓扑、指标、延迟、吞吐量等信息,支持日志追踪和故障分析。

检查点与快照(Checkpoint & Savepoint)

Checkpoint:定期保存应用的状态,用于容错。
Savepoint:手动触发的状态快照,用于升级或迁移任务。

这些部分构成了 Flink 应用的完整生态,从数据输入到数据输出、从开发到部署和运行。

Flink 应用示例

以下是一个完整的 Flink 应用示例,从 Amazon SQS 读取数据,执行 ETL(数据清洗、转换等操作),并将结果写入 Amazon DynamoDB:

前置条件

安装 Flink 集群或本地开发环境。
配置 AWS 凭证,确保对 SQS 和 DynamoDB 的访问权限。
确保 SQS 队列和 DynamoDB 表已创建。

依赖库

确保在项目中引入以下依赖(以 Maven 为例):

<dependencies><!-- Flink Core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.17.0</version></dependency><!-- AWS SDK --><dependency><groupId>software.amazon.awssdk</groupId><artifactId>sqs</artifactId><version>2.20.16</version></dependency><dependency><groupId>software.amazon.awssdk</groupId><artifactId>dynamodb</artifactId><version>2.20.16</version></dependency>
</dependencies>

代码实现

代码主流程

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemResponse;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;import java.util.HashMap;
import java.util.Map;public class FlinkSQSToDynamoDB {public static void main(String[] args) throws Exception {// 1. 创建 Flink 执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 定义 SQS 队列 URL 和配置String sqsQueueUrl = "https://sqs.<region>.amazonaws.com/<account-id>/<queue-name>";SqsClient sqsClient = SqsClient.builder().build();// 3. 从 SQS 队列读取数据DataStream<String> rawStream = env.addSource(new SQSSourceFunction(sqsClient, sqsQueueUrl));// 4. 数据转换逻辑(ETL)DataStream<Map<String, String>> transformedStream = rawStream.map((MapFunction<String, Map<String, String>>) message -> {// 假设消息是 JSON 格式,解析并转换为键值对Map<String, String> transformed = new HashMap<>();transformed.put("id", extractField(message, "id"));transformed.put("name", extractField(message, "name").toUpperCase());transformed.put("timestamp", String.valueOf(System.currentTimeMillis()));return transformed;});// 5. 将转换后的数据写入 DynamoDBtransformedStream.addSink(new DynamoDBSink());// 6. 启动任务env.execute("Flink SQS to DynamoDB ETL");}// 提取字段(简单 JSON 解析示例)private static String extractField(String json, String field) {// 简单的 JSON 解析,可以使用库如 Jackson 或 Gsonint start = json.indexOf("\"" + field + "\":\"") + field.length() + 3;int end = json.indexOf("\"", start);return json.substring(start, end);}
}

自定义 SQS Source

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.Message;import java.util.List;public class SQSSourceFunction implements SourceFunction<String> {private final SqsClient sqsClient;private final String sqsQueueUrl;private volatile boolean isRunning = true;public SQSSourceFunction(SqsClient sqsClient, String sqsQueueUrl) {this.sqsClient = sqsClient;this.sqsQueueUrl = sqsQueueUrl;}@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (isRunning) {ReceiveMessageRequest request = ReceiveMessageRequest.builder().queueUrl(sqsQueueUrl).maxNumberOfMessages(10).build();List<Message> messages = sqsClient.receiveMessage(request).messages();for (Message message : messages) {ctx.collect(message.body());}Thread.sleep(1000); // 控制读取频率}}@Overridepublic void cancel() {isRunning = false;}
}

自定义 DynamoDB Sink

import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;import java.util.Map;public class DynamoDBSink implements SinkFunction<Map<String, String>> {private final DynamoDbClient dynamoDbClient;private final String tableName;public DynamoDBSink() {this.dynamoDbClient = DynamoDbClient.builder().build();this.tableName = "YourDynamoDBTable";}@Overridepublic void invoke(Map<String, String> value, Context context) {PutItemRequest request = PutItemRequest.builder().tableName(tableName).item(convertToDynamoDBItem(value)).build();dynamoDbClient.putItem(request);}private Map<String, AttributeValue> convertToDynamoDBItem(Map<String, String> data) {Map<String, AttributeValue> item = new HashMap<>();data.forEach((key, value) -> item.put(key, AttributeValue.builder().s(value).build()));return item;}
}

运行与调试

编译与打包:将项目打包为 JAR 文件。
提交任务:将 JAR 文件提交到 Flink 集群运行。
监控任务:通过 Flink 的 Web UI 查看任务运行状态。

扩展

错误处理:为 SQS Source 和 DynamoDB Sink 增加异常处理和重试机制。
性能优化:使用批量写入 DynamoDB(batchWriteItem)。
状态管理:添加状态用于去重或处理幂等性问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/2211.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++领域展开第十二幕——类和对象(STL简介——简单了解STL)超详细!!!!

文章目录 前言STL简介什么是STLSTL的版本STL的六大组件STL的重要性如何学习STL 总结 前言 上篇博客我们了解了初阶的模版函数&#xff0c;以及有关的一些使用方法。 今天我们来了解了解STL库的有关知识 跟我一起上车吧 STL简介 什么是STL STL&#xff1a;是C标准库的重要组成…

音频语言模型与多模态体系结构

音频语言模型与多模态体系结构 多模态模型正在创造语言、视觉和语音等以前独立的研究领域的协同效应。这些模型使用通用架构,将每种模式视为不同的“token”,使它们能够以一种与人类认知非常相似的方式联合建模和理解世界。 ​ ​可以将多模态分为两个主要领域:输入空间(…

HTML中最基本的东西

本文内容的标签&#xff0c;将是看懂HTML的最基本之基本 &#xff0c;是跟您在写文章时候一样内容。一般想掌握极其容易&#xff0c;但是也要懂得如何使用&#xff0c;过目不忘&#xff0c;为手熟尔。才是我们学习的最终目的。其实边看边敲都行&#xff0c;或者是边看边复制粘贴…

LVGL移植高通点阵字库GT30L24A3W

字库芯片: GT30L24A3W MCU:STM32F429 LVGL版本:V8.4 一、实现gt_read_data() 和 r_dat_bat() 请参考下面视频 如何在32位MCU上使用高通点阵字库_哔哩哔哩_bilibili 高通字库使用教程(1)硬件链接与注意事项部分_哔哩哔哩_bilibili 高通字库使用教程(2)SPI底层函数使用_哔哩…

计算机的错误计算(二百一十二)

摘要 利用两个大模型计算 实验表明&#xff0c;两个大模型均进行了中肯的分析。另外&#xff0c;其中一个大模型给出了 Python代码&#xff0c;运行后&#xff0c;结果中有7位错误数字&#xff1b;而一个大模型进行加减运算时出错。 例1. 计算 下面是与一个大模型的对话…

蓝桥与力扣刷题(709 转换成小写字母)

题目&#xff1a;给你一个字符串 s &#xff0c;将该字符串中的大写字母转换成相同的小写字母&#xff0c;返回新的字符串。 示例 1&#xff1a; 输入&#xff1a;s "Hello" 输出&#xff1a;"hello"示例 2&#xff1a; 输入&#xff1a;s "here…

9.7 visual studio 搭建yolov10的onnx的预测(c++)

1.环境配置 在进行onnx预测前&#xff0c;需要搭建的环境如下: 1.opencv环境的配置&#xff0c;可参考博客:9.2 c搭建opencv环境-CSDN博客 2.libtorch环境的配置&#xff0c;可参考博客&#xff1a;9.4 visualStudio 2022 配置 cuda 和 torch (c)-CSDN博客 3.cuda环境的配置…

自建RustDesk服务器

RustDesk服务端 下面的截图是我本地的一个服务器做为演示用&#xff0c;你自行的搭建服务需要该服务器有固定的ip地址 1、通过宝塔面板快速安装 2、点击【安装】后会有一个配置信息&#xff0c;默认即可 3、点击【确认】后会自动安装等待安装完成 4、安装完成后点击【打开…

浅谈云计算15 | 存储可靠性技术(RAID)

存储可靠性技术 一、存储可靠性需求1.1 数据完整性1.2 数据可用性1.3 故障容错性 二、传统RAID技术剖析2.1 RAID 02.2 RAID 12.3 RAID 52.4 RAID 62.5 RAID 10 三、RAID 2.0技术3.1 RAID 2.0技术原理3.1.1 两层虚拟化管理模式3.1.2 数据分布与重构 3.2 RAID 2.0技术优势3.2.1 自…

Android JecPack组件之LifeCycles 使用详解

一、背景 LifeCycle 是一个可以感知宿主生命周期变化的组件。常见的宿主包括 Activity/Fragment、Service 和 Application。LifeCycle 会持有宿主的生命周期状态的信息&#xff0c;当宿主生命周期发生变化时&#xff0c;会通知监听宿主的观察者。 LifeCycle 的出现主要是为了…

Facebook 隐私风波:互联网时代数据安全警钟

在社交媒体飞速发展的今天&#xff0c;个人数据的隐私保护已成为全球关注的焦点。作为全球最大的社交平台之一&#xff0c;Facebook面临的隐私问题&#xff0c;尤其是数据泄露事件&#xff0c;频繁引发公众的广泛讨论。从用户信息被滥用到数据泄漏&#xff0c;Facebook的隐私挑…

candb++ windows11运行报错,找不到mfc140.dll

解决问题记录 mfc140.dll下载 注意&#xff1a;放置位置别搞错了

蓝桥杯备赛:顺序表和单链表相关算法题详解(上)

目录 一.询问学号&#xff08;顺序表&#xff09; 1.题目来源&#xff1a; 2.解析与代码实现&#xff1a; &#xff08;1&#xff09;解析&#xff1a; &#xff08;2&#xff09;代码实现&#xff1a; 二.寄包柜&#xff08;顺序表&#xff09; 1.题目来源&#xff1a; …

uni-app的学习

uni-app 有着跨平台支持、丰富的插件和生态系统、高性能、集成开发工具HBuilderX的配合使用。允许使用者仅通过一套代码发布到多平台使用。 uni-app官网 uni-app 是一个适合开发跨平台移动应用和小程序的框架&#xff0c;能够大幅提高开发效率。 一、了解 1.1 工具准备 从Git…

基于光偏振与光学调制实现白光干涉相移

基于光的偏振特性和一些光学元件对光的调制作用&#xff0c;实现白光干涉中的光学相移原理是一个复杂而精细的过程。以下是对这一原理的详细解释&#xff1a; 一、光的偏振特性 光的偏振是指光波在传播过程中&#xff0c;光矢量的方向和大小有规则变化的现象。圆偏振光的电场…

Flutter:封装ActionSheet 操作菜单

演示效果图 action_sheet_util.dart import package:ducafe_ui_core/ducafe_ui_core.dart; import package:flutter/material.dart; import package:demo/common/index.dart;class ActionSheetUtil {/// 底部操作表/// [context] 上下文/// [title] 标题/// [items] 选项列表 …

使用yarn命令创建Vue3项目

文章目录 1.技术栈2.创建流程2.1创建vue3项目2.2选择配置项2.3进入项目目录 3.使用Yarn启动项目3.1安装依赖3.2运行项目 1.技术栈 yarnvitevue3 2.创建流程 2.1创建vue3项目 vue create 项目名称2.2选择配置项 直接回车可选择Vue3 2.3进入项目目录 cd 项目名称默认在当前…

【Node.js的安装与配置】

目录&#xff1a; 一&#xff1a;下载Node.js二&#xff1a;安装Node.js三&#xff1a;配置存放目录四&#xff1a;配置环境变量五&#xff1a;配置淘宝镜像六&#xff1a;测试Node.js 一&#xff1a;下载Node.js &#x1f534; 下载地址&#xff1a;https://www.nodejs.com.cn…

【AIGC】SYNCAMMASTER:多视角多像机的视频生成

标题&#xff1a;SYNCAMMASTER: SYNCHRONIZING MULTI-CAMERA VIDEO GENERATION FROM DIVERSE VIEWPOINTS 主页&#xff1a;https://jianhongbai.github.io/SynCamMaster/ 代码&#xff1a;https://github.com/KwaiVGI/SynCamMaster 文章目录 摘要一、引言二、使用步骤2.1 TextT…

左神算法基础提升--1

文章目录 哈希函数哈希函数的主要特点确定性快速计算输出长度固定离散性 哈希表哈希表的原理解题 布隆过滤器布隆过滤器的主要特点高效性快速查询空间效率误报率 布隆过滤器的原理 一致性哈希一致性哈希原理一致性哈希应用 哈希函数 哈希函数是一种将任意长度的输入&#xff0…