大数据学习之SparkStreaming、PB级百战出行网约车项目一

一.SparkStreaming

163.SparkStreaming概述

Spark Streaming is an extension of the core Spark API that
enables scalable, high-throughput, fault-tolerant stream
processing of live data streams.
Spark Streaming 是核心 Spark API 的扩展,支持实时数据流的
可扩展、高吞吐量、容错流处理。
Spark Streaming 用于流式数据的处理。 Spark Streaming 支持
的数据输入源很多,例如: Kafka Flume HDFS Kinesis TCP
套接字等等。数据输入后可以用 Spark 的高级函数(如 map
reduce join window 等进行运算。而结果也能保存在很多地方,
HDFS ,数据库和实时仪表板等。还可以可以在数据流上应用
Spark 的机器学习和图形处理算法。
Spark Streaming 接收实时输入数据流,并将数据分为多个批
次,然后由 Spark 引擎进行处理,以批量生成最终结果流。在内部,
它的工作原理如下:

164.SparkStreaming_架构

背压机制 ( 了解 ) Spark 1.5 以前版本,用户如果要限制 Receiver 的数据接收速
率,可以通过设置静态配制参数
“spark.streaming.receiver.maxRate” 的值来实现,此举虽然可以通
过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会
引入其它问题。比如: producer 数据生产高于 maxRate ,当前集群
处理能力也高于 maxRate ,这就会造成资源利用率下降等问题。
为了更好的协调数据接收速率与资源处理能力, 1.5 版本开始
Spark Streaming 可以动态控制数据接收速率来适配集群数据处理
能力。背压机制(即 Spark Streaming Backpressure : 根据
JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收
率。
通过属性 “spark.streaming.backpressure.enabled” 来控制是
否启用 backpressure 机制,默认值 false ,即不启用。

165.SparkStreaming_创建项目

<dependency>
<groupId> org.apache.spark </groupId>
<artifactId> spark-core_2.12 </artifactId>
<version> 3.2.1 </version>
</dependency>
<dependency>
<groupId> org.apache.spark </groupId>
<artifactId> spark
streaming_2.12 </artifactId>
<version> 3.2.1 </version>
</dependency>

166.SparkStreaming_WORDCOUNT

package com . itbaizhan . streaming
import org . apache . spark . SparkConf
import org . apache . spark . streaming . dstream .
{ DStream , ReceiverInputDStream }
import org . apache . spark . streaming .{ Seconds ,
StreamingContext }
object StreamingWordCount {
def main ( args : Array [ String ]): Unit = {
//1. 初始化 SparkConf 类的对象
val conf : SparkConf = new SparkConf ()
    . setMaster ( "local[*]" )
    . setAppName ( "StreamingWordCount" )
//2. 创建 StreamingContext 对象
val ssc = new StreamingContext ( conf ,
Seconds ( 5 ))
//3. 通过监控 node1 9999 端口创建 DStream 对象
val lines : ReceiverInputDStream [ String ]
=
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
7 测试
1
node1
2
IDEA 中运行程序
3
node1
4
查看 IDEA 控制台
ssc . socketTextStream ( "node1" , 9999 )
//4. 将每一行数据做切分,形成一个个单词
val wordsDS : DStream [ String ] =
lines . flatMap ( _ . split ( " " ))
//5.word=>(word,1)
val wordOne : DStream [( String , Int )] =
wordsDS . map (( _ , 1 ))
//6. 将相同的 key value 做聚合加
val wordCount : DStream [( String , Int )] =
wordOne . reduceByKey ( _ + _ )
//7. 打印输出
wordCount . print ()
//8. 启动
ssc . start ()
//9. 等待执行停止
ssc . awaitTermination ()
}
}

167.SparkStreaming_数据抽象

168.SparkStreaming_RDD队列创建DSTREAM

169.SparkStreaming_自定义数据源一

需求:自定义数据源,实现监控指定的端口号,获取该端口号
内容。
需要继承 Receiver ,并实现 onStart onStop 方法来自定义数据源采集。
package com . itbaizhan . streaming
import org . apache . spark . storage . StorageLevel
import
org . apache . spark . streaming . receiver . Receiver
import java . io .{ BufferedReader ,
InputStreamReader }
import java . net . Socket
import java . nio . charset . StandardCharsets
1
2
3
4
5
6
7
8
9
13 class ReceiverCustomer ( host : String , port :
Int ) extends Receiver [ String ]
( StorageLevel . MEMORY_ONLY ) {
// 最初启动的时候,调用该方法
// 作用:读数据并将数据发送给 Spark
override def onStart (): Unit = {
new Thread ( "Socket Receiver" ) {
override def run () {
receive ()
    }
  }. start ()
}
override def onStop (): Unit = {}
// 读数据并将数据发送给 Spark
def receive (): Unit = {
// 创建一个 Socket
var socket : Socket = new Socket ( host ,
port )
// 定义一个变量,用来接收端口传过来的数据
var input : String = null
// 创建一个 BufferedReader 用于读取端口传来的数
val reader = new BufferedReader ( new
InputStreamReader ( socket . getInputStream ,
StandardCharsets . UTF_8 ))
// 读取数据
input = reader . readLine ()
// receiver 没有关闭并且输入数据不为空,则循环
发送数据给 Spark
while ( ! isStopped () && input != null ) {
store ( input )
input = reader . readLine ()
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
14 使用自定义的数据源采集数据
  }
// 跳出循环则关闭资源
reader . close ()
socket . close ()
// 重启任务
restart ( "restart" )
}
}

170.SparkStreaming_自定义数据源二

package com . itbaizhan . streaming
import org . apache . spark . SparkConf
import org . apache . spark . streaming .{ Seconds ,
StreamingContext }
object CustomerSource {
def main ( args : Array [ String ]): Unit = {
//1. 初始化 Spark 配置信息
val sparkConf = new SparkConf ()
    . setMaster ( "local[*]" )
    . setAppName ( "CustomerSource" )
//2. 初始化
val ssc = new
StreamingContext ( sparkConf , Seconds ( 5 ))
//3. 创建自定义 receiver Streaming
val lines = ssc . receiverStream ( new
ReceiverCustomer ( "node1" , 9999 ))
lines . print ()
//4. 启动
ssc . start ()
ssc . awaitTermination ()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
15 测试
1
node1
2
IDEA 中运行程序
3
node1
4
查看 IDEA 控制台
实时效果反馈
1. 关于 SparkStreaming 接收器自定义数据源的描述,错误的
是:
A
需要继承 Receiver ,并实现 onStart onStop 方法来自定义
数据源采集。
B
Xxx extends Receiver[String](StorageLevel.MEMORY_ONLY)
接收到数据仅保存在
内存中。
C
onStart()
最初启动的时候,调用该方法;作用是读数据并将数
据发给 Spark
D
onStop()
不能空实现。
答案:
1=>D 可以空实现
SparkStreaming_DStream 无状态转换
}
}
20
21
[root@node1 ~] # nc -lk 9999
1
[root@node1 ~] # nc -lk 9999
aa
bb
cc

171.SparkStreaming_DSTREAM无状态转换

172.SparkStreaming_DSTREAM无状态转换transform

173.SparkStreaming_DSTREAM有状态转换

174.SparkStreaming_窗口操作reducebykeyandwidow概述

//reduceFunc– 结合和交换 reduce 函数
//windowDuration– 窗口长度;必须是此数据流批处理间
隔的倍数
//slideDuration– 窗口的滑动间隔 , 即新数据流生成 RDD
的间隔
def reduceByKeyAndWindow (
reduceFunc : ( V , V ) => V ,
windowDuration : Duration ,
slideDuration : Duration
): DStream [( K , V )] = ssc . withScope {
//partitioner– 用于控制新数据流中每个 RDD 分区的
分区器
reduceByKeyAndWindow ( reduceFunc ,
windowDuration , slideDuration ,
defaultPartitioner ())
}

175.SparkStreaming_窗口操作reducebykeyandwidow实战

176.SparkStreaming_窗口操作reducebykeyandwidow优化

177.SparkStreaming_窗口操作WINDOW

178.SparkStreaming_输出

179.SparkStreaming_优雅关闭一

流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主
动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所
以配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内
部程序关闭。
package com . itbaizhan . streaming
import org . apache . spark . SparkConf
import
org . apache . spark . streaming . dstream . ReceiverI
nputDStream
import org . apache . spark . streaming .{ Seconds ,
StreamingContext }
object StreamingStopDemo {
def createSSC (): StreamingContext = {
val sparkConf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "StreamingStop" )
// 设置优雅的关闭
sparkConf . set ( "spark.streaming.stopGraceful
lyOnShutdown" , "true" )
1
2
3
4
5
6
7
8
9
34    
val ssc = new
StreamingContext ( sparkConf , Seconds ( 5 ))
ssc . checkpoint ( "./ckp" )
ssc
}
def main ( args : Array [ String ]): Unit = {
val ssc : StreamingContext =
StreamingContext . getActiveOrCreate ( "./ckp" ,
() => createSSC ())
new Thread ( new
StreamingStop ( ssc )). start ()
val line : ReceiverInputDStream [ String ] =
ssc . socketTextStream ( "node1" , 9999 )
line . print ()
ssc . start ()
ssc . awaitTermination ()
}
}
10
11
12
13
14
15
16
17
18
19
20
21
22
package com . itbaizhan . streaming
import org . apache . hadoop . conf . Configuration
import org . apache . hadoop . fs .{ FileSystem ,
Path }
import org . apache . spark . streaming .
{ StreamingContext , StreamingContextState }
import java . net . URI
class StreamingStop ( ssc : StreamingContext )
extends Runnable {
override def run (): Unit = {
val fs : FileSystem = FileSystem . get ( new
URI ( "hdfs://node2:9820" ),
new Configuration (), "root" )
1
2
3
4
5
6
7
8
9
35 测试
1
启动 hadoop 集群
2
node1 上: nc -lk 9999
3
运行程序
4
node2
5
node1 上:
while ( true ) {
try
Thread . sleep ( 5000 )
catch {
case e : InterruptedException =>
e . printStackTrace ()
    }
val state : StreamingContextState =
ssc . getState
if ( state ==
StreamingContextState . ACTIVE ) {
val bool : Boolean = fs . exists ( new
Path ( "hdfs://node2:9820/stopSpark" ))
if ( bool ) {
ssc . stop ( stopSparkContext = true ,
stopGracefully = true )
System . exit ( 0 )
      }
    }
  }
}
}

180.SparkStreaming_优雅关闭二

181.SparkStreaming_优雅关闭测试

182.SparkStreaming_整合KAFKA模式

183.SparkStreaming_整合kafka开发一

导入依赖:
代码编写:
<dependency>
<groupId> org.apache.spark </groupId>
<artifactId> spark-streaming-kafka-0-
10_2.12 </artifactId>
<version> 3.2.1 </version>
</dependency>
<dependency>
<groupId> com.fasterxml.jackson.core </groupI
d>
<artifactId> jackson-core </artifactId>
<version> 2.12.7 </version>
</dependency>
1
2
3
4
5
6
7
8
9
10
package com . itbaizhan . streaming
import org . apache . kafka . clients . consumer .
{ ConsumerConfig , ConsumerRecord }
import org . apache . spark . SparkConf
import org . apache . spark . streaming . dstream .
{ DStream , InputDStream }
1
2
3
4
40 import org . apache . spark . streaming . kafka010 .
{ ConsumerStrategies , KafkaUtils ,
LocationStrategies }
import org . apache . spark . streaming .{ Seconds ,
StreamingContext }
object DirectAPIDemo {
def main ( args : Array [ String ]): Unit = {
//1. 创建 SparkConf
val sparkConf : SparkConf = new
SparkConf ()
    . setMaster ( "local[*]" )
    . setAppName ( "DirectAPIDemo" )
//2. 创建 StreamingContext
val ssc = new
StreamingContext ( sparkConf , Seconds ( 3 ))
//3. 定义 Kafka 参数
val kafkaPara : Map [ String , Object ] =
Map [ String , Object ](
ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG ->
"node2:9092,node3:9092,node4:9092" ,
ConsumerConfig . GROUP_ID_CONFIG ->
"itbaizhan" ,
"key.deserializer" ->
"org.apache.kafka.common.serialization.Strin
gDeserializer" ,
"value.deserializer" ->
"org.apache.kafka.common.serialization.Strin
gDeserializer"
  )
//4. 读取 Kafka 数据创建 DStream
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
41 SparkStreaming_ 整合 Kafka 测试
val kafkaDStream :
InputDStream [ ConsumerRecord [ String , String ]]
=
KafkaUtils . createDirectStream [ String ,
String ]( ssc ,
// 由框架自动选择位置匹配
LocationStrategies . PreferConsistent ,
// 消费者策略 主题: topicKafka,kafka
数: kafkaPara
ConsumerStrategies . Subscribe [ String ,
String ]( Set ( "topicKafka" ), kafkaPara ))
//5. 将每条消息的 KV 取出
//val valueDStream: DStream[String] =
kafkaDStream.map(record => record.value())
val valueDStream : DStream [ String ] =
kafkaDStream . map ( _ . value ())
//6. 计算 WordCount
valueDStream . flatMap ( _ . split ( " " ))
    . map (( _ , 1 ))
    . reduceByKey ( _ + _ )
    . print ()
//7. 开启任务
ssc . start ()
ssc . awaitTermination ()
}
}

184.SparkStreaming_整合kafka开发二

185.SparkStreaming_整合kafka测试

二.PB级百战出行网约车项目一

1.百战出行

项目需求分析
数据采集平台搭建
1
订单数据实时分析计算乘车人数和订单数
2
虚拟车站
3
订单交易数据统计分析
4
司机数据统计分析
5
用户数据统计分析
6
1 名称
框架
数据采集传输
MaxWell Kafka
数据存储
Hbase MySQL Redis
数据计算
Spark
数据库可视化
Echarts
项目技术点
掌握数据从终端 (APP) 的产生到数据中台处理再到大数据后台处理的整个链路技术。
1
Spark 自定义数据源实现 HBase 数据进行剪枝分析计算。
2
基于 Phoenix 实战海量数据秒查询。
3
平台新用户和留存用户分析。
4
空间索引算法 Uber h3 分析与蜂窝六边形区域订单分析。

2.百战出行架构设计

3.环境搭建_HBASE安装部署

4.环境搭建_KAFKA安装部署

5.环境搭建_MYSQL安装部署

6.环境搭建_REDIS安装部署

7.构建父工程

8.订单监控_收集订单数据

9.订单监控_订单数据分析

10.订单监控_存储数据之读取数据

11.订单监控_存储数据之保持数据至MYSQL

12.订单监控_MAXWELL介绍

13.订单监控_MAXWELL安装

14.订单监控_SPARK_STREAMING整合KAFKA_上

15.订单监控_SPARK_STREAMING整合KAFKA_下

16.订单监控_实时统计订单总数之消费订单数据

17.订单监控_实时统计订单总数之构建订单解析器

18.订单监控_实时统计订单总数之解析订单JSON数据

19.订单监控_实时统计订单总数

20.订单监控_实时统计乘车人数统计

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/18282.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Elasticsearch】Mapping概述

以下是Elasticsearch中提到的关于Mapping的各模块概述&#xff1a; --- 1.Dynamic mapping&#xff08;动态映射&#xff09; 动态映射是指Elasticsearch在索引文档时&#xff0c;自动检测字段类型并创建字段映射的过程。当你首次索引一个文档时&#xff0c;Elasticsearch会根…

如何构建一个AI驱动的前端UI组件生成器

前言 本文将教您如何构建一个AI驱动的前端UI组件生成器&#xff0c;它可以帮助您生成Next.js Tailwind CSS UI组件&#xff0c;并提供实现教程。我们将涵盖以下内容&#xff1a; 使用Next.js、TypeScript和Tailwind CSS构建UI组件生成器Web应用程序。 使用CopilotKit将AI功能…

无耳科技 Solon v3.0.8 发布,Java 企业级应用开发框架

Solon 框架&#xff01; Solon 是新一代&#xff0c;Java 企业级应用开发框架。是杭州无耳科技有限公司的“根级”开源项目&#xff08;最近“杭州六小龙”很火啊&#xff0c;我们也是杭州的哦&#xff09;。从零开始构建&#xff08;No Spring、No Java-EE、No Servlet&#…

Linux | 进程相关概念(进程、进程状态、进程优先级、环境变量、进程地址空间)

文章目录 进程概念1、冯诺依曼体系结构2、进程2.1基本概念2.2描述进程-PCB2.3组织进程2.4查看进程2.5通过系统调用获取进程标识符2.6通过系统调用创建进程-fork初识fork の 头文件与返回值fork函数的调用逻辑和底层逻辑 3、进程状态3.1状态3.2进程状态查看命令3.2.1 ps命令3.2.…

超越DeepSeek R1的Moe开源大模型 Qwen2.5-max 和 Qwen Chat Web UI 的发布,阿里搅动AI生态

敲黑板&#xff0c;说重点&#xff0c;最近阿里推出的 Qwen2.5-max 和 Qwen Chat Web UI&#xff0c;将对AI生态又一次冲击。 说冲击&#xff0c;因为 DeepSeek R1的热潮还未散退的情况下&#xff0c;由于服务器压力不能注册新的API&#xff0c;然后价格涨价&#xff0c;服务有…

无公网IP可实现外网访问开发速查备忘录 Quick Reference

Quick Reference 是一款为开发人员准备的快速参考和备忘清单&#xff0c;涵盖了各种编程语言、框架、工具和命令行工具的常用语法和用法。目的就是为了开发人员在开发时方便技术栈查阅&#xff0c;提高开发者的开发效率。 本文将详细的介绍如何利用 Docker 在本地部署 Quick Re…

【ARM】JTAG接口介绍

1、 文档目标 对 JTAG 接口有更多的认识&#xff0c;在遇到关于 JTAG 接口问题时有一些排查的思路。 2、 问题场景 在使用调试器过程时&#xff0c;免不了要接触到 JTAG 接口&#xff0c;当出现连接不上时&#xff0c;就不知道从哪来进行排查。 3、软硬件环境 1 软件版本&am…

两步在 Vite 中配置 Tailwindcss

第一步&#xff1a;安装依赖 npm i -D tailwindcss tailwindcss/vite第二步&#xff1a;引入 tailwindcss 更改配置 // src/main.js import tailwindcss/index// vite.config.js import vue from vitejs/plugin-vue import tailwindcss from tailwindcss/viteexport default …

Threadlocal的实现原理

文章目录 ThreadLocal与Thread关系分析Threadlocal 不支持继承性lnheritableThreadLocal 类 ThreadLocal与Thread关系分析 由该图可知&#xff0c; Thread 类中有一个 threadLocals 和一个 inheritableThreadLocals &#xff0c; 它们 都是 ThreadLocalMap 类型 的变量 &#x…

arm linux下的中断处理过程。

本文基于ast2600 soc来阐述&#xff0c;内核版本为5.10 1.中断gic初始化 start_kernel() -> init_IRQ() -> irqchip_init() of_irq_init()主要是构建of_intc_desc. 489-514: 从__irqchip_of_table中找到dts node中匹配的of_table(匹配matches->compatible)&#xf…

oracle使用动态sql将多层级组织展平

ERP或者其他企业管理软件中都会有一张组织机构表&#xff0c;可以写固定sql的方式将其展平获取组织表中的字段信息&#xff0c;如负责人、上级组织负责人、分管领导、成立时间等。但是这种方式有个缺陷&#xff0c;就是如果只写到处理4个层级&#xff0c;那么后期层级增多就无法…

layui怎么请求数据

layui怎么请求数据 ​编辑 下次还敢 发布&#xff1a; 2024-04-04 03:30:19 原创 1152人浏览过 Layui 提供四种数据请求方式&#xff1a;$.ajax() Ajax 方式Fetch API 方式layui 内置 Ajax 方式layui 内置请求方式&#xff0c;用于监听提交事件 Layui中请求数据的几种方式…

mybatis-plus逆向code generator pgsql实践

mybatis-plus逆向code generator pgsql实践 环境准备重要工具的版本供参考pom依赖待逆向的SQL 配置文件CodeGenerator配置类配置类说明 环境准备 重要工具的版本 jdk1.8.0_131springboot 2.7.6mybatis-plus 3.5.7pgsql 14.15 供参考pom依赖 <?xml version"1.0&quo…

【IoTDB 线上小课 11】为什么 DeepSeek 要选择开源?

新年新气象&#xff0c;【IoTDB 视频小课】第十一期全新来临&#xff01; 关于 IoTDB&#xff0c;关于物联网&#xff0c;关于时序数据库&#xff0c;关于开源... 一个问题重点&#xff0c;3-5 分钟&#xff0c;我们讲给你听&#xff1a; 开源“加成”再次展现&#xff01; 现在…

Java面试宝典:说下Spring Bean的生命周期?

Java面试宝典专栏范围&#xff1a;JAVA基础&#xff0c;面向对象编程&#xff08;OOP&#xff09;&#xff0c;异常处理&#xff0c;集合框架&#xff0c;Java I/O&#xff0c;多线程编程&#xff0c;设计模式&#xff0c;网络编程&#xff0c;框架和工具等全方位面试题详解 每…

web自动化-浏览器驱动下载

web-UI自动化最终要的一步就是下载安装浏览器驱动&#xff0c;下面是常用浏览器驱动的下载安装地址&#xff0c;以及安装之后如何验证的方法&#xff1a; 一、查看浏览器版本号 通过selenium进行自动化测试过程中&#xff0c;浏览器驱动的版本必须要和浏览器的版本保持一致&am…

PDF另存为图片的一个方法

说明 有时需要把PDF的每一页另存为图片。用Devexpress可以很方便的完成这个功能。 窗体上放置一个PdfViewer。 然后循环每一页 for (int i 1; i < pdfViewer1.PageCount; i) 调用 chg_pdf_to_bmp函数获得图片并保存 chg_pdf_to_bmp中调用了PdfViewer的CreateBitmap函数…

easyexcel快速使用

1.easyexcel EasyExcel是一个基于ava的简单、省内存的读写Excel的开源项目。在尽可能节约内存的情况下支持读写百M的Excel 即通过java完成对excel的读写操作&#xff0c; 上传下载 2.easyexcel写操作 把java类中的对象写入到excel表格中 步骤 1.引入依赖 <depen…

opencv中minAreaRect函数输出角度问题

opencv中minAreaRect函数输出角度问题 新版opencv中minAreaRect函数计算最小外接矩形时&#xff0c;角度范围由旧版的[-90, 0]变为[0, 90]。 cv2.minAreaRect输入&#xff1a;四边形的四个点&#xff08;不要求顺序&#xff09;。 输出&#xff1a;最小外接矩形的中心点坐标x…

Python Pandas(7):Pandas 数据清洗

数据清洗是对一些没有用的数据进行处理的过程。很多数据集存在数据缺失、数据格式错误、错误数据或重复数据的情况&#xff0c;如果要使数据分析更加准确&#xff0c;就需要对这些没有用的数据进行处理。数据清洗与预处理的常见步骤&#xff1a; 缺失值处理&#xff1a;识别并…