streamlit 实现 flink SQL运行界面

实现效果

在这里插入图片描述

streamlit

flink-playground.py 文件如下:

import streamlit as st
import io
import contextlib
import sys
import os
import uuid
import subprocess
from jinja2 import Templatest.set_page_config(layout="wide")# 设置页面标题
st.title("Flink SQL")# 初始化会话状态
if 'user_id' not in st.session_state:st.session_state.user_id = str(uuid.uuid4())# 创建一个输入框用于配置 JobManager 地址
st.session_state.jobmanager_address = st.text_input("JobManager 地址", value="10.50.108.7:48085")
# 创建一个文本框用于输入配置项
default_config = """
execution.checkpointing.interval=10s
execution.runtime-mode=batch
sql-client.execution.result-mode=table
sql-client.execution.max-table-result.rows=10000
pipeline.auto-watermark-interval=200
pipeline.max-parallelism=10
table.exec.state.ttl=1000
restart-strategy.type=fixed-delay
table.optimizer.join-reorder-enabled=true
table.exec.spill-compression.enabled=true
table.exec.spill-compression.block-size=128kb
""".strip()
st.session_state.config_input = st.text_area("输入配置项 (格式: key=value)", height=300, value=default_config)# 创建一个大的文本框用于输入代码
st.session_state.sql_input = st.text_area("输入你的 Flink SQL 代码", height=500)# 创建一个按钮
if st.button("执行 Flink Job"):try:# 读取模板文件with open("/work/template/pyflink-job.py.template", "r") as template_file:template_content = template_file.read()# 解析配置项config_dict = {}for line in st.session_state.config_input.splitlines():if '=' in line:key, value = line.split('=', 1)config_dict[key.strip()] = value.strip()# 使用 jinja2 模板引擎渲染模板template = Template(template_content)job_content = template.render(sqls=st.session_state.sql_input, config_items=config_dict)st.text("完整pyflink任务代码")st.code(job_content, language='python')# 将替换后的内容保存到临时文件file_name = f"flink_job_{st.session_state.user_id}.py"with open(file_name, "w") as job_file:job_file.write(job_content)# 使用 subprocess 执行 flink run 命令,并传递 JobManager 地址command = f"flink run -m {st.session_state.jobmanager_address} -py {file_name}"result = subprocess.run(command, shell=True, capture_output=True, text=True)# 获取捕获的输出captured_output = result.stdout# 显示输出结果st.text_area("执行结果", value=captured_output, height=200)except Exception as e:# 如果代码执行出错,打印错误信息st.error(f"代码执行出错: {e}")finally:# 删除临时文件if file_name and os.path.exists(file_name):os.remove(file_name)

运行:

nohup streamlit run /work/flink-playground.py --server.port 9999 2>&1  > .streamlit.log &

模板文件

模板文件根据用户输入动态更新任务配置和SQL

import re
from pyflink.table import EnvironmentSettings, TableEnvironmentdef remove_comments(sql):# 使用正则表达式删除单行注释和多行注释sql = re.sub(r'--.*$', '', sql, flags=re.MULTILINE)  # 删除单行注释sql = re.sub(r'/\*.*?\*/', '', sql, flags=re.DOTALL)  # 删除多行注释return sqldef execute_sql_file(table_env, sql_statements):sql_statements = sql_statements.split(';')for sql in sql_statements:# 删除注释sql = remove_comments(sql)sql = sql.strip()if sql:print(f"Executing SQL: {sql}")result = table_env.execute_sql(sql)# if result:#     result.print()def main():# 创建 TableEnvironmentenv_settings = EnvironmentSettings.new_instance().in_batch_mode().build()table_env = TableEnvironment.create(env_settings)table_config = table_env.get_config(){% for key, value in config_items.items() %}table_config.get_configuration().set_string("{{ key }}", "{{ value }}"){% endfor %}sqls = """{{ sqls }}"""# 读取 SQL 文件并执行execute_sql_file(table_env, sqls)if __name__ == "__main__":main()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/455528.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SL3160 dcdc150V降压5.1V/1A 车载GPS定位器供电芯片

一、主要特性 宽输入电压范围:SL3160支持10~150V的宽输入电压范围,使其能够适应各种电源电压波动,确保稳定输出。 高效降压转换:该芯片采用先进的电源管理技术,转换效率高达90%以上,降低了散热压力和整体…

点云标注工具开发记录(五)之点云文件加载、视角转换

在Open3D中,通过read方法,我们可以读取不同格式的点云数据,那么,在不使用Open3D的相关接口时,我们就需要自己重写文件读入、加载、渲染展示方法,效果如下: 点云文件读入 首先,我们要…

vue开发的一个小插件vue.js devtools

可打开谷歌商城的情况下,不可打开的可以到极简插件里面去下载 极简插件官网_Chrome插件下载_Chrome浏览器应用商店 搜索vue即可

Flutter仿京东商城APP实战 用户中心基础布局

用户中心界面 pages/tabs/user/user.dart import package:flutter/material.dart; import package:jdshop/utils/zdp_screen.dart; import package:provider/provider.dart;import ../../../store/counter_store.dart;class UserPage extends StatefulWidget {const UserPage…

Maven入门到实践:从安装到项目构建与IDEA集成

目录 1. Maven的概念 1.1 什么是Maven 1.2 什么是依赖管理 1.3 什么是项目构建 1.4 Maven的应用场景 1.5 为什么使用Maven 1.6 Maven模型 2.初识Maven 2.1 Maven安装 2.1.1 安装准备 2.1.2 Maven安装目录分析 2.1.3 Maven的环境变量 2.2 Maven的第一个项目 2.2.1…

古埃及象形文字在线字典

我在个人网站“小孔的埃及学站点”上推出了在线的象形文字字典,总共收罗了将近700条的象形文字(词)。在线字典的使用方法很简单,在网站各大版块首页的右上方会有如下图所示的查询入口。 点击文本框,输入中文或英文关键…

公交IC卡收单管理系统 assets 信息泄露

0x01 产品描述: 公交IC卡系统是公交一卡通系统核心建设部分,是高时尚、高科技的管理系统,大大提升了公交行业的服务,能让公交企业信息化和电子化打下一个良好的硬件基础和软件基0x02 漏洞描述: 公交IC卡系统在/assets/…

FRIDA-JSAPI:Instruction使用

官方API文档介绍 Instruction.parse(target) 解析内存中 target 地址处的指令。 返回的对象具有的字段: address: 此指令的地址(EIP),类型为 NativePointernext: 指向下一条指令的指针,您可以使用 parse() 解析它size…

详解如何使用WGCLOUD监测日志文件

WGCLOUD可以监控日志文件,包括.log、.txt、.out等类型的文件 WGCLOUD既可以监测文件夹下按天生成的日志文件,也可以监控指定的日志文件,非常灵活 我们只需要设置好日志中出现什么关键字符,那么WGCLOUD就可以自动进行这些监控工作…

【react 和 vue】 ---- 实现组件的递归渲染

1. 需求场景 今天遇到了一个需求,就是 HTML 的递归渲染。问题就是商品的可用时间,使用规则等数据是后端配置,然后配置规则则是可以无限递归的往下配置,可以存在很多级。后端实现后,数据返回前端,就需要前端…

一招教你解决Facebook广告账号问题

这段时间,我们写了很多文章来探讨Facebook的广告账户问题:《Facebook被封号该怎么办》《Facebook二不限、三不限账号是什么》《Facebook海外户(三不限)和账单户该如何选择》《如何区分真假Facebook三不限海外户》相信看过这些文章…

【传知代码】智能推荐与隐私保护的融合(论文复现)

本文将深入探讨这样一种系统的设计理念、关键技术以及其在实际应用中的潜力和优势。通过探索如何在保证个性化推荐效果的同时,有效保护用户隐私,我们将揭示出一种新兴的技术趋势,为未来智能化应用的发展开辟新的可能性。 目录 概述 项目设计…

基于SSM+小程序的就业管理系统(就业1)

👉文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1、项目介绍 学生实习与就业管理系统的设计与实现管理员、辅导员管理、企业管理、工作管理人、用户管理5个角色。 1、管理员实现了基础数据管理、辅导员管理、企业管理、工作管理人管理、公告信息管理…

js 填充数组

let arr Array.from({ length: 10 }, (_, index) > index)console.log(arr) 人工智能学习网站 https://chat.xutongbao.top

如何用示波器检测次级点火系统(一)

写在最前面: 单看标题可能会让你觉得这篇文章的主题是关于检测线圈,火花塞和火花塞插头电线。但我们指的是分析燃烧室内电子的行为。目标是看燃料混合物,阀座,压缩,积碳和其它影响这种特性的症状。最终目的是要学会分…

FIR数字滤波器在MATLAB中的实现

摘要 数字滤波器是由数字乘法器、加法器和延时单元组成的一种装置。数字滤波器的功能是对输入离散信号的数字代码进行运算处理,以达到改变信号频谱的目的。近年来数字滤波在通信、图像编码、语言编码、雷达等许多领域中有着十分广泛的应用。 本文首先介绍了数字滤波…

为什么诺贝尔物理学奖颁给了 AI 大神

瑞典皇家科学院刚宣布,科学家约翰霍普菲尔德(John J. Hopfield) 和杰弗里辛顿 (Geoffrey E. Hinton) 荣膺 2024年诺贝尔物理学奖,以表彰他们通过人工神经网络 (ANN) 实现机器学习而作出的基础性发现和发明 (for foundational discoveries and…

程序员:代码世界的探险家与日常“救火队员”

在这个被数字与代码编织的时代,程序员,这一群看似平凡却又不凡的“数字工匠”,正用他们的智慧与汗水,构建着我们生活的每一个角落。值此1024程序员节之际,让我们以轻松幽默的方式,一同走进程序员的世界&…

8轴/4轴的EtherCAT轴模块EIO24088G-V2及EIO16084G的使用(一):TwinCAT总线配置与使用

上节课给大家介绍了 EIO24088-V2及EIO16084结合RTSys进行总线配置与使用,详情请点击→8轴/4轴的EtherCAT轴模块EIO24088-V2及EIO16084的使用(一):RTSys总线配置与使用。 今天正运动给大家分享一下EIO24088G-V2及EIO16084G如何用T…

DNS安全概述

一、DNS的解析过程 1.递归解析 递归解析是一种由DNS客户端(通常是用户的应用程序,如一个浏览器)向本地DNS解析器发出解析请求,然后本地DNS解析器负责查询最终结果并将结果返回给客户端,而中间的所有查询请求都由本地D…