用pyspark把kafka主题数据经过etl导入另一个主题中的有关报错

首先看一下我们的示例代码

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
"""
------------------------------------------Description : TODO:SourceFile : etl_stream_kafkaAuthor  : zxxDate  : 2024/11/14
-------------------------------------------
"""
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'D:/bigdata/03-java/java-8/jdk'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/bigdata/04-Hadoop/hadoop/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("etl_stream_kafka").config("spark.sql.shuffle.partitions", 2).getOrCreate()# 连接kafkareadDF = spark.readStream.format("kafka") \.option("kafka.bootstrap.servers", "bigdata01:9092") \.option("subscribe", "topicA") \.load()# 使用DSL语句etlDF = readDF.selectExpr("cast(value as STRING)").filter(F.col("value").contains("success"))etlDF.writeStream \.format("kafka") \.option("kafka.bootstrap.servers", "bigdata01:9092") \.option("topic", "etlTopic") \.option("checkpointLocation", "../../datas/kafka_stream") \.start().awaitTermination()# 关闭spark.stop()

运行发现报错

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):File "D:\bigdata\18-python\pyspark_project\pythonProject1\main\streamingkafka\etl_stream_kafka.py", line 22, in <module>readDF = spark.readStream.format("kafka") \File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\streaming.py", line 482, in loadreturn self._df(self._jreader.load())File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\py4j\java_gateway.py", line 1304, in __call__return_value = get_return_value(File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\utils.py", line 117, in decoraise converted from None
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

报错 : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

解决:这个是因为缺少了Kafka和Spark的集成包,前往https://mvnrepository.com/artifact/org.apache.spark

下载对应的jar包即可,比如我是SparkSql写入的Kafka,那么我就需要下载Spark-Sql-Kafka.x.x.x.jar

 进入网站(已打包放入文章末尾)

找到对应有关spark 和kafka的模块

找到对应的版本 ,这里我用的kafka是3.0版本,下载的是3.1.2版本

 点进去,下载jar包

 再次运行会发现仍然报错,这是因为jar包之间的依赖关系,从刚才下载的界面下面再下载有关的jar包

 

 

 

 再次运行即可

 jar包下载链接

【免费】用pyspark把数据从kafka的一个主题用流处理后再导入kafka的另一个主题的有关报错资源-CSDN文库

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/476198.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是反向 DNS 查找以及它的作用是什么?

反向DNS查询&#xff08;rDNS&#xff09;是一种技术&#xff0c;用于确定与某个IP地址对应的域名。当我们对一个IP地址进行反向DNS查询时&#xff0c;实际上是向域名系统&#xff08;DNS&#xff09;的特殊部分请求信息&#xff0c;这部分被称为PTR记录。PTR记录会返回与这个I…

HarmonyOS鸿蒙系统上File文件常用操作

HarmonyOS鸿蒙系统上&#xff0c;file文件常用操作记录 1.创建文件 createFile(fileName: string, content: string): string {// 获取应用文件路径let context getContext(this) as common.UIAbilityContext;let filesDirPath context.filesDir / fileName;// 新建并打开…

音视频pts/dts

现在的视频流有两个非常重要的时间戳&#xff0c;pts和dts&#xff0c;其中pts是显示的时候用&#xff0c;dts在解码的时候用。 pts很好理解&#xff0c;按照pts的顺序以及duration不间断的display就可以了。 dts在解码的时候用&#xff0c;那么这句话怎么理解&#xff0c;解…

输出比较简介

输出比较简介 主要是用来输出PWM波形&#xff0c;这个波形是驱动电机的&#xff08;智能车和机器人等&#xff09;必要条件 OC&#xff08;Output Compare&#xff09;输出比较&#xff0c;还有IC&#xff0c;全称是Input Capture&#xff0c;意为输入捕获&#xff0c;还有CC…

揭秘AIGC下的数字时代:交互设计的隐秘力量与未来革命

在当今数字化时代&#xff0c;交互设计已经成为我们日常生活中不可或缺的一部分。它不仅仅是关于产品或服务的界面设计&#xff0c;更是关于如何通过这些界面与人进行有效的沟通和互动。本文将探讨交互设计的深层含义、面临的挑战以及其对未来科技发展的影响。 文章来源&#x…

使用node-addon-api实现从c到nodejs模块全流程

目录 1 前言 2 安装nodejs 3 安装开发工具链 3.1 安装node-gyp 3.2 安装编译工具链&#xff08;C/C 编译器&#xff09; 4 初始化 Node.js 项目 4.1 创建项目目录 4.2 初始化 package.json 4.3 安装必要的库 5 编写代码 5.1 创建项目结构 5.2 编写动态库代码 5.3 编…

Python3.11.9+selenium,获取图片验证码以及输入验证码数字

Python3.11.9+selenium,获取图片验证码以及输入验证码数字 1、遇到问题:登录或修改密码需要验证码 2、解决办法: 2.1、安装ddddocr pip install ddddocr 2.2、解析验证码函数 import ddddocr def get_capcha_text():#获取验证码图片ele_pic = driver.find_element(By.XPAT…

测试工程师如何在面试中脱颖而出

目录 1.平时工作中是怎么去测的&#xff1f; 2.B/S架构和C/S架构区别 3.B/S架构的系统从哪些点去测&#xff1f; 4.你为什么能够做测试这一行&#xff1f;&#xff08;根据个人情况分析理解&#xff09; 5.你认为测试的目的是什么&#xff1f; 6.软件测试的流程&#xff…

css水平居中+垂直居中

display:“flex”,position: “absolute”,top:“50%”,left:“50%”,transform: ‘translate(-50%, -50%)’

Linux 服务器使用指南:从入门到登录

&#x1f31f;快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。 &#x1f31f; &#x1f6a9;博主致力于用通俗易懂且不失专业性的文字&#xff0c;讲解计算机领域那些看似枯燥的知识点&#x1f6a9; 目录 一…

99.【C语言】数据结构之二叉树的基本知识

目录 1.树的定义 树是递归定义的 一些细碎的概念 2.树的判断法则 树结点结构的定义 自然想到的定义方法 左孩子右兄弟定义 3.树的应用:文件系统 4.树的特殊形式:二叉树 5.特殊的两类二叉树 满二叉树 完全二叉树 完全二叉树和满二叉树之间的关系 高度为h的完全二叉…

Bug:引入Feign后触发了2次、4次ContextRefreshedEvent

Bug&#xff1a;引入Feign后发现监控onApplication中ContextRefreshedEvent事件触发了2次或者4次。 【原理】在Spring的文档注释中提示到&#xff1a; Event raised when an {code ApplicationContext} gets initialized or refreshed.即当 ApplicationContext 进行初始化或者刷…

Ubuntu20.04从零安装IsaacSim/IsaacLab

Ubuntu20.04从零安装IsaacSim/IsaacLab 电脑硬件配置&#xff1a;安装Isaac sim方案一&#xff1a;pip安装方案二&#xff1a;预构建二进制文件安装1、安装ominiverse2、在ominiverse中安装isaac sim&#xff0c;下载最新的4.2版本 安装Isaac Lab1、IsaacLab环境克隆2、创建con…

低速接口项目之串口Uart开发(二)——FIFO实现串口数据的收发回环测试

本节目录 一、设计思路 二、loop环回模块 三、仿真模块 四、仿真验证 五、上板验证 六、往期文章链接本节内容 一、设计思路 串口数据的收发回环测试&#xff0c;最简单的硬件测试是把Tx和Rx连接在一起&#xff0c;然后上位机进行发送和接收测试&#xff0c;但是需要考虑到串…

算法编程题-排序

算法编程题-排序 比较型排序算法冒泡排序选择排序插入排序希尔排序堆排序快速排序归并排序 非比较型排序算法计数排序基数排序 本文将对七中经典比较型排序算法进行介绍&#xff0c;并且给出golang语言的实现&#xff0c;还包括基数排序、计数排序等非比较型的算法的介绍和实现…

【软考】系统架构设计师-信息系统基础

#信息系统基础核心知识点 信息系统5个基本功能&#xff1a;输入、存储、处理、输出和控制 诺兰模型&#xff1a;信息系统计划的阶段模型&#xff0c;6阶段 初始阶段&#xff0c;传播阶段&#xff0c;控制阶段&#xff0c;集成阶段&#xff0c;数据管理阶段&#xff0c;成熟阶…

【架构】主流企业架构Zachman、ToGAF、FEA、DoDAF介绍

文章目录 前言一、Zachman架构二、ToGAF架构三、FEA架构四、DoDAF 前言 企业架构&#xff08;Enterprise Architecture&#xff0c;EA&#xff09;是指企业在信息技术和业务流程方面的整体设计和规划。 最近接触到“企业架构”这个概念&#xff0c;转念一想必定和我们软件架构…

使用低成本的蓝牙HID硬件模拟鼠标和键盘来实现自动化脚本

做过自动化脚本的都知道&#xff0c;现在很多传统的自动化脚本方案几乎都可以被检测&#xff0c;比如基于root&#xff0c;adb等方案。用外置的带有鼠标和键盘功能集的蓝牙HID硬件来直接点击和滑动是非常靠谱的方案&#xff0c;也是未来的趋势所在。 一、使用蓝牙HID硬件的优势…

数据结构-二叉树_堆

目录 1.二叉树的概念 ​编辑1.1树的概念与结构 1.2树的相关语 1.3 树的表示 2. ⼆叉树 2.1 概念与结构 2.2 特殊的⼆叉树 2.2.2 完全⼆叉树 2.3 ⼆叉树存储结构 2.3.1 顺序结构 2.3.2 链式结构 3. 实现顺序结构⼆叉树 3.2 堆的实现 3.2.2 向下调整算法 1.二叉树的概…

【FPGA开发】AXI-Full总线接口介绍、FPGA搭建仿真平台

文章目录 协议解读接口介绍AW—写地址通道W—写数据通道B—写响应通道AR—读地址通道R—读数据通道 FPGA搭建仿真平台 本文主要介绍AXI-FULL的相关基础内容&#xff0c;AXI-Lite请移步&#xff1a; 【FPGA开发】AXI-Lite总线协议解读、Verilog逻辑开发与仿真、Alex Forencich代…