k8s 搭建基于session模式的flink集群

1.flink集群搭建

不废话直接上代码,都是基于官网的,在此记录一下 Kubernetes | Apache Flink

flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122jobmanager.memory.process.size: 1600mtaskmanager.memory.process.size: 1728mparallelism.default: 2    log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.pekko.name = org.apache.pekkologger.pekko.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF    

jobmanager-service.yaml Optional service, which is only necessary for non-HA mode.

apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager

Session cluster resource definitions #

jobmanager-session-deployment-non-ha.yaml

apiVersion: apps/v1
kind: Deployment
metadata:name: flink-jobmanager
spec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: apache/flink:latestargs: ["jobmanager"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/confsecurityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties

taskmanager-session-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: apache/flink:latestargs: ["taskmanager"]ports:- containerPort: 6122name: rpclivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf/securityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties

 kubectl apply -f xxx.yaml 或者 kubectl apply -f ./flink  flink为文件夹,存放的是以上这几个.yaml文件

为flink的ui界面添加nodeport即可外部访问

2. demo代码测试

创建一个maven工程,pom.xml引入依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>test-platform</artifactId><groupId>com.test</groupId><version>2.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>flink-demo</artifactId><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.0</flink.version><log4j.version>2.20.0</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><scope>compile</scope><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><scope>compile</scope><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><scope>compile</scope><version>${log4j.version}</version></dependency></dependencies></project>

log4j2.xml:

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5"><Properties><property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /><!-- LOG_LEVEL 配置你需要的日志输出级别       --><property name="LOG_LEVEL" value="INFO" /></Properties><appenders><console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="${LOG_PATTERN}"/><ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/></console></appenders><loggers><root level="${LOG_LEVEL}"><appender-ref ref="Console"/></root></loggers></configuration>

计数代码:

package com.test.flink;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountUnboundStreamDemo {public static void main(String[] args) throws Exception {// TODO 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
//                3, // 尝试重启的次数
//                Time.of(10, TimeUnit.SECONDS) // 间隔
//        ));// TODO 2.读取数据DataStreamSource<String> lineDS = env.socketTextStream("192.168.0.28", 7777);// TODO 3.处理数据: 切分、转换、分组、聚合// TODO 3.1 切分、转换SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = lineDS //<输入类型, 输出类型>.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 按照 空格 切分String[] words = value.split(" ");for (String word : words) {// 转换成 二元组 (word,1)Tuple2<String, Integer> wordsAndOne = Tuple2.of(word, 1);// 通过 采集器 向下游发送数据out.collect(wordsAndOne);}}});// TODO 3.2 分组KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});// TODO 3.3 聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);// TODO 4.输出数据sumDS.print("接收到的数据=======").setParallelism(1);// TODO 5.执行:类似 sparkstreaming最后 ssc.start()env.execute(sumDS.getClass().getSimpleName());}}

打成jar包导入flink dashboard:

在另一台机器上运行 nc -lk -p 7777,如果出现连接拒绝,查看是否放开端口号

k8s查看读取到的数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/127485.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Vue篇】Vue 项目下载、介绍(详细版)

如何创建一个vue项目&#xff1f;首先要有环境&#xff0c;如下&#xff1a; nodejs vue-cli如果有以上的工具就直接跳过安装教程 【Vue篇】mac上Vue 开发环境搭建、运行Vue项目&#xff08;保姆级&#xff09; 创建vue项目 选择一个位置&#xff0c;你要存放项目的路径&…

海保人寿:开源治理保障科技与保险融合,助力保险业务数字化改革创新

海保人寿保险股份有限公司&#xff08;简称“海保人寿”&#xff09;是第一家在海南筹建开业的全国性保险机构。从成立之初&#xff0c;便深耕于数字化创新&#xff0c;在自身多业务环节中实现数字化转型&#xff0c;依托优秀的研发体系与数智融合的业务系统&#xff0c;不断推…

RocketMQMessageListener使用错误问题分析与排查

背景 RocketMQ与SpingBoot相结合可以大大降低我们开发的复杂度&#xff0c;但是最近在一个新项目中使用RocketMQMessageListener 监听消息&#xff0c;导致消费者启动失败&#xff0c;提示该消费组已经被创建了&#xff0c;请重新申请一个消费者组。 Caused by: org.apache.r…

【深度学习】 Python 和 NumPy 系列教程(三):Python容器:1、列表List详解(初始化、索引、切片、更新、删除、常用函数、拆包、遍历)

目录 一、前言 二、实验环境 三、Python容器&#xff08;Containers&#xff09; 0、容器介绍 1、列表&#xff08;List&#xff09; 1. 初始化 a. 创建空列表 b. 使用现有元素初始化列表 c. 使用列表生成式 d. 复制列表 2. 索引和切片 a. 索引 b. 负数索引 c. 切…

龙迅LT86102UX HDMI一进二出,支持分辨率4K60HZ

龙迅LT86102UXE 1. 描述 龙迅LT86102UX HDMI2.0 分路器具有符合 HDMI2.0/1.4 规范的 1&#xff1a;2 分路器、最大 6Gbps 高速数据速率、自适应均衡 RX 输入和预强调的 TX 输出&#xff0c;支持长电缆应用&#xff0c;板载无 XTAL&#xff0c;可节省 BOM 成本。 LT86102UX HDM…

【Linux】- Linux下搭建Java环境[IDEA,JDK8,Tomcat]

Java环境 1. 安装JDK2.安装tomcat3.安装idea4. 安装MySQL5.7 1. 安装JDK /usr/local&#xff1a;存放用户自行安装的软件&#xff0c;默认情况下不会被系统软件包管理器管理 发现解压后的文件已经整体移动到/usr/local/java 文件夹下 打开bin目录&#xff0c;可以看到java的版…

Nginx参数配置详细说明【全局、http块、server块、events块】【已亲测】

Nginx重点参数配置说明 本文包含Nginx参数配置说明全局块、http块、server块、events块共计30多个参数配置与解释&#xff0c;其中常见参数包含配置错误出现的错误日志&#xff0c;能让你更快的解决问题。 该文的所有参数大部分经过单独测试&#xff0c;错误都是自己收集出来的…

每日刷题-3

目录 一、选择题 二、编程题 1、计算糖果 2、进制转换 一、选择题 1、 解析&#xff1a;在C语言中&#xff0c;以0开头的整数常量是八进制的&#xff0c;而不是十进制的。所以&#xff0c;0123的八进制表示相当于83的十进制表示&#xff0c;而123的十进制表示不变。printf函数…

(翻译)JavaFX高级教程:JavaFX2.0的FXML语言

原文地址http://download.oracle.com/javafx/2.0/fxml_get_started/jfxpub-fxml_get_started.htm FXML是JavaFX 2.0新引入的。你可能会问"What is FXML?" 和"Is FXML for me?" FXML 是基于XML的一种声明性标记语言&#xff0c;用来定义应用的用户接口。F…

QT设计一个小闹钟

设置一个闹钟&#xff0c;左侧窗口显示当前时间&#xff0c;右侧设置时间&#xff0c;以及控制闹钟的开关&#xff0c;下方显示闹钟响时的提示语。当按启动按钮时&#xff0c;设置时间与闹钟提示语均不可再改变。当点击停止时&#xff0c;关闭闹钟并重新启用设置时间与闹钟提示…

【MySQL】详解聚合查询、多表查询

MySQL 增删查改&#xff08;进阶&#xff09; 文章目录 MySQL 增删查改&#xff08;进阶&#xff09;01 表的设计表的三大范式 02 查询操作进阶新增聚合查询countsumavgmaxmin 分组查询 GROUP BYHAVING 联合查询/多表查询关键思路引入内连接外连接左外连接&#xff1a;left joi…

有限状态机的概念

一、有限状态机的概念 有限状态机简称状态机&#xff0c;是表示有限个状态&#xff0c;以及在状态之间的转移和动作等行为的数学模型。状态机的要素有状态和状态转移两个。 在Unity中&#xff0c;动画状态机最重要的属性就是节点和连线&#xff0c;其中每个节点都是一个动画片…

Emscripten安装并配置环境变量

前言 Emscripten官网 官网有安装教程&#xff0c;但有些细节没有讲清楚&#xff0c;本文会很详细的讲解每一步。 一、下载 emsdk 包 emsdk – github地址 可以使用 git 去拉取&#xff0c;不过可能会超时拉取失败。 git clone https://github.com/emscripten-core/emsdk.…

数据结构与算法-队列

一.队列的基本概述 1.队列的定义 答&#xff1a;队列是现在在两端进行插入和删除操作的线性表&#xff0c;"队尾"是允许进行存入…

系统软件启动过程

实验一&#xff1a;系统软件启动过程 参考 重要文件 调用顺序 1. boot/bootasm.S | bootasm.asm&#xff08;修改了名字&#xff0c;以便于彩色显示&#xff09;a. 开启A20 16位地址线 实现 20位地址访问 芯片版本兼容通过写 键盘控制器8042 的 64h端口 与 60h端口。b.…

ApachePulsar原理解析与应用实践(学习笔记一)

随着时代的发展&#xff0c;软件设计的理念也在不断发展&#xff0c;从单体服务、面向服务、微服务&#xff0c;发展到云原生以及无服务。其演变的过程是一个能力不断增强&#xff0c;领域边界不断微分细化的过程。比如无服务就是将函数作为服务&#xff0c;就类似dns模式的服务…

什么是50ETF期权开户条件,怎么开期权交易权限?

50ETF期权是指上证50ETF期权&#xff0c;标的物是上证50ETF&#xff0c;代码是&#xff08;510500&#xff09;&#xff0c;期权是一种在上证50ETF基础上进行衍生品交易的金融工具&#xff0c;下文科普什么是50ETF期权开户条件&#xff0c;怎么开期权交易权限&#xff1f;本文来…

死锁

目录 什么是死锁 产生的条件 死锁避免 银行家算法 问题引入 银行家算法的实现思想 死锁检测 每种类型一个资源的死锁检测 每种类型多个资源的死锁检测 死锁恢复 鸵鸟算法 什么是死锁 线程死锁是指由于两个或者多个线程互相持有对方所需要的资源&#xff0c;导致这些线…

3.3.2 【MySQL】客户端和服务器通信中的字符集

3.3.2.1 编码和解码使用的字符集不一致的后果 我们知道字符 我 在 utf8 字符集编码下的字节串长这样&#xff1a; 0xE68891 &#xff0c;如果一个程序把这个字节串发送到另一个程序里&#xff0c;另一个程序用不同的字符集去解码这个字节串&#xff0c;假设使用的是 gbk 字符集…

3ds max插件CG MAGIC中的室外功能可以高效出图吗?

使用3ds Max高效出图秘诀有没有什么秘诀呢&#xff1f;如何做到快速出图呢&#xff1f; 3ds max插件CG MAGIC中的室外功能可以高效出图吗&#xff1f; CG MAGIC 是一款基于3DS max深入开发的智能辅助设计插件。 自从CG Magic专业版上线之后&#xff0c;小伙伴们对新功能诀窍…