流批一体计算引擎-4-[Flink]消费kafka实时数据

Python3.6.9 Flink 1.15.2消费Kafaka Topic
PyFlink基础应用之kafka
通过PyFlink作业处理Kafka数据

1 环境准备

1.1 启动kafka

(1)启动zookeeper
zkServer.sh start(2)启动kafka
cd /usr/local/kafka/
nohup ./bin/kafka-server-start.sh ./config/server.properties >> /tmp/kafkaoutput.log 2>&1 &
或者
./bin/kafka-server-start.sh -daemon ./config/server0.properties(3)查看进程如下
jps
10101 QuorumPeerMain
11047 Kafka(4)kafka tools配置
C:\Windows\System32\drivers\etc\hosts(5)查看日志文件
/tmp/kafkaoutput.log或者/usr/local/kafka/logs(6)创建Topic主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 -replication-factor 1 --partitions 1 --topic flink_kafakasource(7)查看当前创建的Topic
bin/kafka-topics.sh --list --zookeeper localhost:2181(8)查看kafka版本
kafka_2.12-2.2.0.jar
可以看出scala的版本是2.12,kafka的版本是2.2.0

1.2 启动Flink

(1)启动flink
start-cluster.sh(2)查看是否启用成功
jps
4704 TaskManagerRunner
4443 StandaloneSessionClusterEntrypoint(3)关闭Flink
stop-cluster.sh

1.3 安装PyFlink

PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。

1.3.1 python3和pip3的配置

一、系统中安装了多个版本的python3

编译安装python时
其中–prefix选项是配置安装的路径,
若是不配置该选项,安装后可执行文件默认放在/usr/local/bin,
库文件默认放在/usr/local/lib,
配置文件默认放在/usr/local/etc,
其它的资源文件放在/usr /local/share,比较凌乱。/usr/local/bin/python3.6m 
/usr/local/bin/python3.6m-config 
/usr/include/python3.6m/usr/local/bin/python3.6 
/usr/local/bin/python3.6-config 
/usr/local/lib/python3.6 /usr/local/bin/python3.10 
/usr/local/bin/python3.10-config
/usr/local/lib/python3.10 

二、环境变量path作用顺序

#echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/usr/local/jdk1.8.0_144/bin:/root/bin:/home/data/java/bin:/home/data/java/jre/bin
按照顺序进行显示

三、安装Pyflink

ln -s 源文件 目标文件
ln -s /usr/local/bin/python3.6 /usr/local/bin/python3
ln -s /usr/local/bin/pip3.6 /usr/local/bin/pip3/usr/local/bin/python3.6 -m pip install --upgrade pip
pip3 install apache-flink==1.15.3 -i http://pypi.douban.com/simple --trusted-host pypi.douban.com包文件安装后的位置
/usr/local/lib/python3.6/site-packages

1.3.2 配置Flink Kafka连接

(1)在https://mvnrepository.com/里输入flink kafka寻找对应版本的连接器
在这里插入图片描述

(2)选择Flink对应的版本1.15.3,点击jar
在这里插入图片描述
在这里插入图片描述

(3)分别下载flink-connector-base和kafka-clients对应的jar包
在这里插入图片描述

(4)将该jar包放置在python的lib目录下
/usr/local/lib/python3.6/dist-packages/pyflink/lib。
在这里插入图片描述

(5)将该jar包放置在Flink的lib目录下
拷贝三个jar包到FLINK_HOME/lib下。
在这里插入图片描述

2 消费kafka写入本地文件

2.1 flinkDemo.py

本应用采用pyflink+sql方式编写代码。

# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialects_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)
st_env = StreamTableEnvironment.create(s_env)  # , TableConfig())
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
sourceKafkaDdl = """
create table sourceKafka(
id int,name varchar
)
with('connector'='kafka','topic'='flink_kafakasource','properties.bootstrap.servers'='192.168.43.48:9092','scan.startup.mode'='latest-offset','format'='json'
)
"""
st_env.execute_sql(sourceKafkaDdl)
fieldNames = ["id", "name"]
fieldTypes = [DataTypes.INT(), DataTypes.STRING()]
csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result.csv", ",", 1, WriteMode.OVERWRITE)
st_env.register_table_sink("csvTableSink", csvSink)
st_env.execute_sql("""INSERT INTO csvTableSinkselect * from sourceKafka
""").wait()

2.2 执行方式

2.2.1 方式一:直接IDEA中运行

无需安装flink。

(1)安装pyflink
pip3 install apache-flink==1.15.3(2)配置pycharm的flink环境:
首先最重要的是版本问题,这里给出我的相关版本配置
kafka:2.2.0
jdk:1.8.0_201
apache-flink: 1.15.3
相应的jar包版本。
flink-connector-base-1.15.3.jar
flink-connector-kafka-1.15.3.jar
kafka-clients-2.8.1.jar将jar包放入External Libraries下的site-packages下的pyflink下的lib中。(3)运行
python3 flinkDemo.py

2.2.2 方式二:命令行提交到Flink

/usr/local/flink-1.15.3/bin/flink run -py flinkDemo.py
或
/usr/local/flink-1.15.3/bin/flink run --python flinkDemo.py
显示如下:
Job has been submitted with JobID 1f3d2ffc0b0c5f9274040fd008a5ec17

在这里插入图片描述

2.3 模拟数据

打开kafka生产者,通过客户端生产数据。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink_kafakasource
{"id":2,"name":"查询kafka后存储到cvs文件中"}

2.4 查看Flink侧结果

在这里插入图片描述

3 消费kafka写入kafka

直接本地IDEA中运行即可。

# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialects_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)
st_env = StreamTableEnvironment.create(s_env)  # , TableConfig())
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")sourceKafkaDdl = """
create table sourceKafka(
id int,name varchar
)
with('connector'='kafka','topic'='flink_kafakasource','properties.bootstrap.servers'='192.168.43.48:9092','scan.startup.mode'='latest-offset','format'='json'
)
"""sinkKafkaDdl = """
create table sinkKafka(
id int,name varchar
)
with('connector'='kafka','topic'='result','properties.bootstrap.servers'='192.168.43.48:9092','scan.startup.mode'='latest-offset','format'='json'
)
"""
st_env.execute_sql(sourceKafkaDdl)
st_env.execute_sql(sinkKafkaDdl)st_env.execute_sql("""INSERT INTO sinkKafkaselect * from sourceKafka
""").wait()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/39679.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【仿牛客网笔记】 Kafka,构建TB级异步消息系统——发送系统通知、显示系统通知

定义时间主题 判断消息内容是否为空,消息格式是否错误。 系统通知是后台发给用户 发送站内通知 构造一个Message对象 设置站内的值 判断是否有数据,然后放入到message中 对CommentController、LikeController、FollowController进行处理。 需要注入…

kafka集群压测与优化

影响kafka集群性能的因数有多个,网络带宽、cpu、内存、磁盘读写速度、副本数、分区数、broker数量、内存缓存等因素都会影响kafka集群的性能 1.优化kafka集群配置 server.properties配置文件优化 num.network.threads4 num.io.threads4 socket.send.buffer.bytes…

熵,信息熵,香农熵,微分熵,交叉熵,相对熵

2019-07-13 https://blog.csdn.net/landstream/article/details/82383503 https://blog.csdn.net/pipisorry/article/details/51695283 https://www.zhihu.com/question/41252833 https://cloud.tencent.com/developer/article/1397504 按顺序查看更容易理解 0、背景 在信息论中…

【Hive】云任务大量卡住故障分析

项目场景: 上一章节我们简单介绍到了JVM调优相关的知识,本章节结合日常故障处理进一步说明相关的使用 问题描述 在云上,hive任务出现大面积卡住的现象,但并无任何报错信息,具体如下: 原因分析&#xff1…

Linux服务器出现异常和卡顿排查思路和步骤

目录 前言一、查看内存使用情况二、查看磁盘使用情况三、top命令3.1 jmap分析堆内存配置信息和使用情况3.2 jstack分析线程的执行情况3.3 jstat查看各个区域占堆百分比 四、其他指令总结 前言 Linux 服务器出现异常和卡顿的原因有很多,以下是一些常见的原因&#x…

熵、交叉熵和散度

熵 自信息 I(x) - log p(x) 对于分布为P(x)的随机变量X,自信息的数学期望 即熵H(X)定义为: 熵越高,随机变量信息越高,反之越少。不同概率分布对应熵如下:P p()熵10001/21/41/41/31/31/3 概率分布越均匀&#xff0…

【腾讯轻量应用服务器上部署kafka并通过flink读取kafka数据】

环境准备 经过1个月的摸索,最终选择在腾讯云上搭建一个学习环境。当时选择原因还是新用户有优惠(150左右3年),但现在看1核2g的配置勉强够用,建议后续小伙伴选择时最好是2核4g配置。 由于是单节点安装,需要准备如下资源&#xff1…

【Twitter Storm系列】flume-ng+Kafka+Storm+HDFS 实时系统搭建

技术交流群:59701880 深圳广州hadoop好友会 微信公众号:后续博客的文档都会转到微信公众号中。 一直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的FlumeKafkaStorm的实时日志流系统的搭建文档,自己也跟…

学习笔记之信息量、熵、KL散度、交叉熵的一些介绍

文章目录 信息量熵KL散度(相对熵)交叉熵参考 信息量 以前我也一直只是知道信息量的计算公式,也有想过为什么会是这样,但是因为要学的东西太多了,就没怎么深究,直到看了“交叉熵”如何做损失函数&#xff1…

AI产品经理是如何理解机器学习的(附注释)

本期目的:回顾之前敲代码的日常,利用生活场景帮助AI产品经理认识机器学习及学习过程中常用算法和评价指标。 适用读者:AI/数据产品经理。 阅读重点:品,细品,结合班级故事再品。 最近看了很多关于机器学习…

卢伟冰称小米今年库存会回到更健康水位 将加大门店整合调整

雷递网 乐天 3月25日 小米集团(股票代码为:1810)今日发布财报,财报显示,小米集团2022年营收为2800亿元,较上年同期的3283亿元下降14.7%。小米2022年经调净利为85亿元。 小米2022年第四季度营收为660.47亿元…

小米年营收2800亿:经调整利润85亿 电动汽车业务投入31亿

雷递网 雷建平 3月24日 小米集团(股票代码为:1810)今日发布财报,财报显示,小米集团2022年营收为2800亿元,较上年同期的3283亿元下降14.7%。 2022年,小米集团的境外市场收入为1378亿元&#xff0…

什么是低代码开发平台(apaas)?低代码开发平台的价值有哪些

手码6500字,带你快速看懂:什么是低代码开发平台(apaas),低代码有哪些价值,以及低代码平台的使用逻辑和心得。 一、什么是低代码开发平台(apaas)? 低代码开发平台是一种a…

详解如何在ChatGPT内构建一个Python解释器

这篇文章主要为大家详细介绍了如何在ChatGPT内构建一个Python解释器,文中的示例代码讲解详细,具有一定的学习价值,需要的可以参考一下 目录 引用:Art Kulakov 《How to Build a Python Interpreter Inside ChatGPT》 这个灵感来自…

如何画各种“图”?

我们在写文档过程中,多多少少都会画一些图来说明程序的功能,这篇文章就来说明一些常用的"图"怎么画。 1,画图工具 常用的有这么几种 Visio:这是很常用的,基本上的图都可以用这个来画。StarUML&#xff1a…

patreon cg人物插画作品合集分享

1、wlop大神作品4K-8K精选无水印图片http://theme.chengxuz.com/265.html 2、加拿大女画师sakimi chan作品1-98期插画http://theme.chengxuz.com/250.html 3、画师Mirco Cabbia插画作品合集欣赏http://theme.chengxuz.com/306.html 4、画师Sciamano240插画作品分享http://th…

如何画场景插画?场景插画的起稿、构图技巧!

如何画场景插画?绘画初学者如何构图?绘画初学者如何起稿?学习绘画难吗?怎样才能学好绘画?想必这些都是绘画初学者们经常在想的问题吧,就是不知道如何才能绘画好一个场景,不知道如何起稿也不知道…

手把手教你实现手绘风格图形

大家好,我是 漫步,今天分享一篇高难度的图形绘制文章。 Rough.js[1]是一个手绘风格的图形库,提供了一些基本图形的绘制能力,比如:虽然笔者是个糙汉子,但是对这种可爱的东西都没啥抵抗力,这个库的…

Python自动绘制UML类图、函数调用图(Call Graph)

文章目录 1. 引言2. 绘制UML类图2.1 安装graphviz2.2 安装pyreverse2.3 绘制UML类图 3. 绘制函数调用图3.1 安装graphviz3.2 安装pycallgraph3.3 使用示例第一种:从命令行调用第二种:从API调用 小结 1. 引言 在设计软件、分析代码时,我们常常…

房价预测2

学习: https://blog.csdn.net/u012063773/article/details/79349256 https://www.cnblogs.com/massquantity/p/8640991.html https://zhuanlan.zhihu.com/p/39429689 详解stacking过程 之前在房价预测1中对一些异常值进行了drop处理 后来在分割train和test的时候…