SparkStreaming与Kafka整合

1.3 SparkStreaming与Kafka整合

1.3.1 整合简述
kafka是做消息的缓存,数据和业务隔离操作的消息队列,而sparkstreaming是一款准实时流式计算框架,所以二者的整合,是大势所趋。
​
二者的整合,有主要的两大版本。

kafka作为一个实时的分布式消息队列,实时的生产和消费消息,在实际开发中Spark Streaming经常会结合Kafka来处理实时数据。Spark Streaming 与 kafka整合需要引入spark-streaming-kafka.jar,该jar根据kafka版本有2个分支,分别是spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10。jar包分支选择原则:

  • 0.10.0>kafka版本>=0.8.2.1,选择 08 接口

  • kafka版本>=0.10.0,选择 010 接口

sparkStreaming和Kafka整合一般两种方式:Receiver方式和Direct方式

Receiver方式(介绍)

Receiver方式基于kafka的高级消费者API实现(高级优点:高级API写起来简单;不需要去自行去管理offset,系统通过zookeeper自行管理;不需要管理分区,副本等情况,系统自动管理;消费者断线会自动根据上一次记录在 zookeeper中的offset去接着获取数据;高级缺点:不能自行控制 offset;不能细化控制如分区、副本、zk 等)。Receiver从kafka接收数据,存储在Executor中,Spark Streaming 定时生成任务来处理数据。

默认配置的情况,Receiver失败时有可能丢失数据。如果要保证数据的可靠性,需要开启预写式日志,简称WAL(Write Ahead Logs,Spark1.2引入),只有接收到的数据被持久化之后才会去更新Kafka中的消费位移。接收到的数据和WAL存储位置信息被可靠地存储,如果期间出现故障,这些信息被用来从错误中恢复,并继续处理数据。

还有几个需要注意的点:

  • 在Receiver的方式中,Spark中的 partition 和 kafka 中的 partition 并不是相关的,如果加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度;

  • 对于不同的 Group 和 Topic 可以使用多个 Receiver 创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream;

  • 如果启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是:KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

  • WAL将接收的数据备份到HDFS上,保证了数据的安全性。但写HDFS比较消耗性能,另外要在备份完数据之后还要写相关的元数据信息,这样总体上增加job的执行时间,增加了任务执行时间;

  • 总体上看 Receiver 方式,不适于生产环境;

1.3.2  Direct的方式
Direct方式从Spark1.3开始引入的,通过 KafkaUtils.createDirectStream 方法创建一个DStream对象,Direct方式的结构如下图所示。

Direct 方式特点如下:

  • 对应Kafka的版本 0.8.2.1+

  • Direct 方式

  • Offset 可自定义

  • 使用kafka低阶API

  • 底层实现为KafkaRDD

该方式中Kafka的一个分区与Spark RDD对应,通过定期扫描所订阅Kafka每个主题的每个分区的最新偏移量以确定当前批处理数据偏移范围。与Receiver方式相比,Direct方式不需要维护一份WAL数据,由Spark Streaming程序自己控制位移的处理,通常通过检查点机制处理消费位移,这样可以保证Kafka中的数据只会被Spark拉取一次

  • 引入依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.1.2</version>
</dependency>
  • 模拟kafka生产数据

package com.qianfeng.sparkstreaming
​
import java.util.{Properties, Random}
​
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
​
/*** 向kafka中test主题模拟生产数据;;;也可以使用命令行生产:kafka-console-producer.sh --broker-list qianfeng01:9092,hadoop02:9092,hadoop03:9092 -topic test*/
object Demo02_DataLoad2Kafka {def main(args: Array[String]): Unit = {val prop = new Properties()//提供Kafka服务器信息prop.put("bootstrap.servers","qianfeng01:9092")//指定响应的方式prop.put("acks","all")//请求失败重试的次数prop.put("retries","3")//指定key的序列化方式,key是用于存放数据对应的offsetprop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")//指定value的序列化方式prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")//创建producer对象val producer = new KafkaProducer[String,String](prop)//提供一个数组,数组中数据val arr = Array("hello tom","hello jerry","hello dabao","hello zhangsan","hello lisi","hello wangwu",)//提供一个随机数,随机获取数组中数据向kafka中进行发送存储val r = new Random()while(true){val message = arr(r.nextInt(arr.length))producer.send(new ProducerRecord[String,String]("test",message))Thread.sleep(r.nextInt(1000))   //休眠1s以内}}
}
  • 实时消费kafka数据

package com.qianfeng.sparkstreaming
​
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
​
/*** sparkStreaming消费Kafka中的数据*/
object Demo03_SparkStreamingWithKafka {def main(args: Array[String]): Unit = {//1.创建SparkConf对象val conf = new SparkConf().setAppName("SparkStreamingToKafka").setMaster("local[*]")//2.提供批次时间val time = Seconds(5)//3.提供StreamingContext对象val sc = new StreamingContext(conf, time)//4.提供Kafka配置参数val kafkaConfig = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "qianfeng01:9092",ConsumerConfig.GROUP_ID_CONFIG -> "qianfeng","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",)//5.读取Kafka中数据信息生成DStreamval value = KafkaUtils.createDirectStream(sc,//本地化策略:将Kafka的分区数据均匀的分配到各个执行Executor中LocationStrategies.PreferConsistent,//表示要从使用kafka进行消费【offset谁来管理,从那个位置开始消费数据】ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaConfig))//6.将每条消息kv获取出来val line: DStream[String] = value.map(record => record.value())//7.开始计算操作line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()//line.count().print()   //每隔5s的数据条数//8.开始任务sc.start()sc.awaitTermination()}
}
  • 说明

    1. 简化的并行性:不需要创建多个输入Kafka流并将其合并。 使用directStream,Spark Streaming将创建与使用Kafka分区一样多的RDD分区,这些分区将全部从Kafka并行读取数据。 所以在Kafka和RDD分区之间有一对一的映射关系。

    2. 效率:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,这会进一步复制数据。这实际上是效率低下的,因为数据被有效地复制了两次:一次是Kafka,另一次是由预先写入日志(WriteAhead Log)复制。这个第二种方法消除了这个问题,因为没有接收器,因此不需要预先写入日志。只要Kafka数据保留时间足够长。

    3. 正好一次(Exactly-once)的语义:第一种方法使用Kafka的高级API来在Zookeeper中存储消耗的偏移量。传统上这是从Kafka消费数据的方式。虽然这种方法(结合提前写入日志)可以确保零数据丢失(即至少一次语义),但是在某些失败情况下,有一些记录可能会消费两次。发生这种情况是因为Spark Streaming可靠接收到的数据与Zookeeper跟踪的偏移之间的不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。在其检查点内,Spark Streaming跟踪偏移量。这消除了Spark Streaming和Zookeeper/Kafka之间的不一致,因此Spark Streaming每次记录都会在发生故障的情况下有效地收到一次。为了实现输出结果的一次语义,将数据保存到外部数据存储区的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务。

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/228506.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

webpack的深入学习与实战(持续更新)

一、何为Webpack Webpack是 一个开源的JavaScript模块打包工具&#xff0c;其最核心的功能是解决模块之间的依赖&#xff0c;把各个模块按照特定的规则和顺序组织在一起&#xff0c;最终合并为一个JS文件或多个。 二、带宽的换算 目前我们的云服务器带宽为5M 三 、bundle 体…

二叉树题目:根到叶路径上的不足结点

文章目录 题目标题和出处难度题目描述要求示例数据范围 解法思路和算法代码复杂度分析 题目 标题和出处 标题&#xff1a;根到叶路径上的不足结点 出处&#xff1a;1080. 根到叶路径上的不足结点 难度 6 级 题目描述 要求 给定二叉树的根结点 root \texttt{root} root…

maven中dependencyManagement标签

简介 dependencyManagement正如其名&#xff0c;用于项目依赖的统一管理。 在父项目中的pom.xml文件中加入dependencyManagement标签即可完成依赖版本的声明。在声明完成后&#xff0c;子项目&#xff08;module&#xff09;中引用相同的依赖时可以不指定version标签自动引入…

【python报错】UserWarning: train_labels has been renamed targets

UserWarning: train_labels has been renamed targetswarnings.warn(“train_labels has been renamed targets”) 这是一条 Python 警告信息&#xff0c;它表示 train_labels 这个变量已经被重命名为 targets&#xff0c;在将来的版本中可能会移除 train_labels。因此&#x…

51蛋骗鸡595级联1616点阵

缘由如何用单片机独立按键控制16*16LED点阵模块点亮和熄灭?求指导 - 24小时必答区 #include "reg52.h" unsigned char code DuLiAnJian[]{1,2,4,8,16,32,64,128,254,253,251,247,239,223,191,127}; unsigned char code CHARCODE[12][8]{ //{0xFD,0xFD,0x0D,0xED,0x…

如何开发一个google插件(二)

前言 在上一篇文章如何开发一个google插件(一)里主要介绍了google插件的基本结构。 在这篇文章中主要结合reactwebpack进行一个代码演示&#xff0c;源码地址&#xff1a;源码地址 下载源码后打开浏览器的扩展程序管理->加载已解压的扩展程序&#xff0c;即可调试插件 此…

在 Spring 中操作 Redis

&#x1f9f8;欢迎来到dream_ready的博客&#xff0c;&#x1f4dc;相信您对博主首页也很感兴趣o (ˉ▽ˉ&#xff1b;) &#x1f4dc;redis和缓存及相关问题和解决办法 什么是缓存预热、缓存穿透、缓存雪崩、缓存击穿 目录 1、引入依赖 2、对 Redis 的配置文件进行书写 3、S…

Python机器学习原理与算法实现中绘制散点图和线图的操作

作为对数据进行预处理的重要工具之一&#xff0c;散点图&#xff08;Scatter Diagram&#xff09;深受专家、学者们的喜爱。散点图的简要定义就是点在直角坐标系平面上的分布图。研究者对数据制作散点图的主要出发点是通过绘制该图来观察某变量随另一变量变化的大致趋势&#x…

小米SU7汽车发布会; 齐碳科技C+轮融资;网易 1 月 3 日发布子曰教育大模型;百度文心一言用户数已突破 1 亿

投融资 • 3200 家 VC 投资的创业公司破产&#xff0c;那个投 PLG 的 VC 宣布暂停投资了• 云天励飞参与 AI 技术与解决方案提供商智慧互通 Pre-IPO 轮融资• 百度投资 AIGC 公司必优科技• MicroLED量测公司点莘技术获数千万级融资• 智慧互通获AI上市公司云天励飞Pre-IPO轮战…

门控循环单元(GRU)-多输入回归预测

目录 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 亮点与优势&#xff1a; 二、实际运行效果&#xff1a; 三、部分程序&#xff1a; 四、全部代码数据分享&#xff1a; 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 本代码基于Matlab平台编译…

十大排序算法归纳

目录 排序算法的分类 插入排序算法模板 选择排序算法模板 冒泡排序算法模板 希尔排序算法模板 快速排序算法模板 归并排序算法模板 堆排序算法模板 基数排序算法模板 计算排序算法模板 桶排序算法模板 排序算法的分类 插入&#xff1a;插入&#xff0c;折半插入&am…

网站显示不安全警告怎么办?消除网站不安全警告超全指南

网站显示不安全警告怎么办&#xff1f;当用户访问你的网站&#xff0c;而您的网站没有部署SSL证书实现HTTPS加密时&#xff0c;网站就会显示不安全警告&#xff0c;这种警告&#xff0c;不仅有可能阻止用户继续浏览网站&#xff0c;影响网站声誉&#xff0c;还有可能影响网站在…

基于蜉蝣算法优化的Elman神经网络数据预测 - 附代码

基于蜉蝣算法优化的Elman神经网络数据预测 - 附代码 文章目录 基于蜉蝣算法优化的Elman神经网络数据预测 - 附代码1.Elman 神经网络结构2.Elman 神经用络学习过程3.电力负荷预测概述3.1 模型建立 4.基于蜉蝣优化的Elman网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针…

操作系统(Operator System)

这里写目录标题 1. 什么是操作系统2. 主要功能3. 计算机的层状结构4. 什么叫做管理5. 总结6. 为什么要有操作系统7. 最后 1. 什么是操作系统 操作系统&#xff08;英语&#xff1a;Operating System&#xff0c;缩写&#xff1a;OS&#xff09;是一组主管并控制计算机操作、运…

彭涛:2023年终复盘,工作,团队,个人!

眨眼2023即将结束&#xff0c;2024即将开启&#xff0c;每年这个时候&#xff0c;都会简单总结下自己这一年&#xff0c;既是对今年的一个复盘和回顾&#xff0c;也是对新一年的向往和期待。 我的2023年&#xff0c;大概分为 「个人」&#xff0c;「家庭」&#xff0c;「团队」…

C语言实现RSA算法加解密

使用c语言实现了RSA加解密算法&#xff0c;可以加解密文件和字符串。 rsa算法原理 选择两个大素数p和q&#xff1b;计算n p * q;计算φ(n)(p-1)(q-1)&#xff1b;选择与φ(n)互素的整数d&#xff1b;由de1 mod φ(n)计算得到e&#xff1b;公钥是(e, n), 私钥是(d, n);假设明…

设计模式(4)--对象行为(11)--访问者

1. 意图 表示一个作用于某对象结构中的各元素的操作。 使你可以在不改变各元素的类的前提下定义于作用于这些元素的新操作。 2. 五种角色 抽象访问者(Visitor)、具体访问者(Concrete Visitor)、抽象元素(Element)、 具体元素(Concrete Element)、对象结构(ObjectStructure) 3…

12 HAL库的硬件SPI驱动数码管

引言&#xff1a; 本文将为大家介绍一下SPI&#xff0c; 数码管的知识&#xff0c; 以及HAL库驱动SPI接口的数码的代码示例。 一、SPI的基础知识 1. SPI简介 01 SPI是串行外设接口&#xff08;Serial Peripheral Interface&#xff09;的缩写 02 是美国摩托罗拉公司&#xff08…

【ARMv8M Cortex-M33 系列 2 -- Cortex-M33 JLink 连接 及 JFlash 烧写介绍】

请阅读【嵌入式开发学习必备专栏 之Cortex-M33 专栏】 文章目录 Jlink 工具JLink 命令行示例JFlash 烧写问题Jlink 工具 J-Link 是 SEGGER 提供的一款流行的 JTAG 调试器,它支持多个平台和处理器。JLink.exe 是 J-Link 调试器的命令行接口,它允许用户通过命令行执行一系列操…

微信小程序开发系列-11组件间通信02

微信小程序开发系列目录 《微信小程序开发系列-01创建一个最小的小程序项目》 《微信小程序开发系列-02注册小程序》 《微信小程序开发系列-03全局配置中的“window”和“tabBar”》 《微信小程序开发系列-04获取用户图像和昵称》 《微信小程序开发系列-05登录小程序》 《…