Spark读取kafka(流式和批数据)

spark读取kafka(批数据处理)

在这里插入图片描述

在这里插入图片描述

# 按照偏移量读取kafka数据
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# spark读取kafka
options = {# 写kafka配置信息# 指定kafka的连接的broker服务节点信息'kafka.bootstrap.servers': 'node1:9092',# 指定主题'subscribe': 'itcast',# 读取的主题不存在会自动创建# todo 注意一:连接的配置#       主题名称 ,分区编号,偏移量# 指定起始偏移量   {主题名称:{分区编号0:偏移量,分区编号1:偏移量....}}'startingOffsets':""" {"itcast":{"0":0,"1":1}} """,# 指定结束偏移量  {主题名称:{分区编号0:偏移量,分区编号1:偏移量....}}'endingOffsets':""" {"itcast":{"0":3,"1":2}}  """# 注意点  : 偏移量的区间是左闭右开 ,结束偏移的指定按照最大偏移量加一 ,所有分区都要指定
}
# 读取
# format 指定读取kafka
df = ss.read.load(format='kafka',**options)
# todo 注意二:这一步的数据处理(将value转化为字符串类型)是必须做的,不然你看不懂数据。
#       可以用df.的方式,那我后来怎么都没怎么见过了0
df_select = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp','timestampType')
# 查看df数据
# todo 注意三:这里使用.show()的方式的,是因为它是有界表
df_select.show()

在这里插入图片描述

spark读取kafka(流数据处理)

在这里插入图片描述

# 流式读取kafka数据
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()
# todo 注意一:定义kafka的连接配置
options={# 写kafka配置信息# 指定kafka的连接的broker服务节点信息'kafka.bootstrap.servers': 'node1:9092',# 指定主题'subscribe': 'itheima'  # 读取的主题不存在会自动创建
}
df = ss.readStream.load(format='kafka',**options)
# todo 注意二:必须将value转化为string类型# 计算
df_res = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp')# 输出
# todo 注意三:输出不是df_res.show,
df_res.writeStream.start(format='console',outputMode='append').awaitTermination()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/242409.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言练习day8

变种水仙花 变种水仙花_牛客题霸_牛客网 题目: 思路:我们拿到题目的第一步可以先看一看题目给的例子,1461这个数被从中间拆成了两部分:1和461,14和61,146和1,不知道看到这大家有没有觉得很熟…

适合多种语言的BPE(Byte-Pair Encoding)编码

文章目录 前言BPE参考 前言 因为最近在看T5,里面讲到一些分词的方法如BEP,因为现在都是在玩大模型,那么语料也就都很大,而且还需要适配不同的语言,而不同的语言又不一定像英文那样按空格切分就行,例如咱们…

小程序学习-19

Vant Weapp - 轻量、可靠的小程序 UI 组件库 ​​​​​ Vant Weapp - 轻量、可靠的小程序 UI 组件库 安装出现问题:rollbackFailedOptional: verb npm-session 53699a8e64f465b9 解决办法:http://t.csdnimg.cn/rGUbe Vant Weapp - 轻量、可靠的小程序…

【C++干货铺】C++11新特性——lambda表达式 | 包装器

个人主页点击直达:小白不是程序媛 C系列专栏:C干货铺 代码仓库:Gitee 目录 C98中的排序 lambda表达式 lambda表达式语法 表达式中的各部分说明 lambda表达式的使用 基本的使用 [var]值传递捕捉变量var ​编辑 [&var]引用传递捕…

VC++中使用OpenCV进行颜色检测

VC中使用OpenCV进行颜色检测 在VC中使用OpenCV进行颜色检测非常简单,首选读取一张彩色图像,并调用函数cvtColor(img, imgHSV, COLOR_BGR2HSV);函数将原图img转换成HSV图像imgHSV,再设置好HSV三个分量的上限和下限值,调用inRange函…

在WIN从零开始在QMUE上添加一块自己的开发板(二)

文章目录 一、前言往期回顾 二、CPU虚拟化(一)相关源码(二)举个例子(三)测试 三、内存虚拟化(一)相关源码(二)举个例子测试 参考资料 一、前言 笔者这篇博客…

雷盛红酒LEESON分享葡萄酒也有“社会责任感”?

葡萄酒文化从来都不仅仅是感官体验,一瓶佳酿的背后不但蕴含着风土人情、历史传承和文化交流,更反映了时代社会的变迁以及体现的社会责任意识。 目前葡萄酒生产商追求酒瓶越来越轻就是葡萄酒市场上的一个趋势,因为任何一个行业都在追求与世界共…

c语言算法——大数相加

C数据类型 类型与描述1基本数据类型 它们是算术类型,包括整型(int)、字符型(char)、浮点型(float)和双精度浮点型(double)。2枚举类型: 它们也是算术类型&am…

Vue2的双向数据绑定

Vue2的双向数据绑定 Observer:观察者,这里的主要工作是递归地监听对象上的所有属性,在属性值改变的时候,触发相应的watcher。 Watcher:订阅者,当监听的数据值修改时,执行响应的回调函数&#x…

Spring:StopWatch

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 一、输出总耗时 二、输出所有任务的耗时和占比 总结 提示:以下是本篇文章正文内容,下面案例可供参考 一、输出总耗时 public void stopWatc…

ERP进出库+办公用品管理系统

系统架构 简介系统架构部分页面结构图UML逻辑图办公用品入出库 简介 本系统适用于ERP企业公司职员关于系统化的申请相关办公用品,提高整体系统整合行,加大上下级之间的联系,规避因人员过多,而浪费人力在简单重复的工作中&#xf…

conda国内加速

1、配置国内源 conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/ 2、显示源地址 conda config --set show_channel_urls yes

【MongoDB】下载安装、指令操作

目录 1.下载安装 2.指令 2.1.基础操作指令 2.2.增加 2.3.查询 2.4.修改 2.5.删除 前言: 关于MongoDB的核心概念请移步: 【文档数据库】ES和MongoDB的对比-CSDN博客 1.下载安装 本文以安装Windows版本的mongodb为例,Linux版本的其实…

30岁的路口,这些90后选择离开大城市

#第一批90后今年34岁了#【30岁的路口,这些90后选择离开大城市】#第一批90后现状如何# 据惊蛰研究所:第一批90后今年34岁了。假如从2012年踏入职场,第一批90后如今已在职场摸爬滚打十年。十年之前,他们意气风发来到大城市&#xff…

go语言(十一)----面向对象继承

一、面向对象继承 写一个父类 package mainimport "fmt"type Human struct {name stringsex string }func (this *Human) Eat() {fmt.Println("Human.Eat()...") }func (this *Human) Walk() {fmt.Println("Human.Walk()...") }func main() {h…

开源项目_大模型应用_Chat2DB

1 基本信息 项目地址:https://github.com/chat2db/Chat2DBStar:10.7K 2 功能 Chat2DB 是一个智能且多功能的 SQL 客户端和报表工具,适用于各种数据库。 对于那些平时会用到数据库,但又不是数据库专家的程序员来说,…

CISSP 2024年考试大纲中文版

2024 CISSP详细内容大纲及权重最终版(仅供公众使用) 最后编辑于2023年8月18日-生效日期2024年4月15日 分类 域/任务/子任务 权重 域1 安全和风险管理 16% 1.1 理解、坚持和促进职业道德(2-4项) 1.1.1 ISC2职业道德守则 1.1.2 组织道德守则 1.2 理解并应用安全概…

【MATLAB源码-第119期】基于matlab的GMSK系统1bit差分解调误码率曲线仿真,输出各个节点的波形以及功率谱。

操作环境: MATLAB 2022a 1、算法描述 GMSK(高斯最小频移键控)是一种数字调制技术,广泛应用于移动通信,例如GSM网络。它是一种连续相位调频制式,通过改变载波的相位来传输数据。GMSK的关键特点是其频谱的…

华为FusionStorage Block、OceanStor 100D、OceanStor pacific的区别

华为FusionStorage Block、OceanStor 100D、OceanStor pacific的区别? 华为块存储到底是叫什么呢? 有接触过华为块存储产品的小伙伴肯定都有疑惑,在FusionStorage 、FusionStorage Block、OceanStor 100D、OceanStor pacific等等的名词中&a…

【JSON2WEB】01 WEB管理信息系统架构设计

WEB管理信息系统分三层设计,分别为DataBase数据库、REST2SQL后端、JSON2WEB前端,三层都可以单独部署。 1 DataBase数据库 数据库根据需要选型即可,不需要自己设计开发,一般管理信息系统都选关系数据库,比如Oracle、…