Flink项目实战篇 基于Flink的城市交通监控平台(上)

系列文章目录

Flink项目实战篇 基于Flink的城市交通监控平台(上)
Flink项目实战篇 基于Flink的城市交通监控平台(下)


文章目录

  • 系列文章目录
  • 1. 项目整体介绍
    • 1.1 项目架构
    • 1.2 项目数据流
    • 1.3 项目主要模块
  • 2. 项目数据字典
    • 2.1 卡口车辆采集数据
    • 2.2 城市交通管理数据表
    • 2.3 车辆轨迹数据表
  • 3. 实时卡口监控分析
    • 3.1 创建Maven项目
    • 3.2 准备数据
    • 3.3 实时车辆超速监控
    • 3.4 实时卡口拥堵情况监控
    • 3.5 实时最通畅的TopN卡口


1. 项目整体介绍

近几年来,随着国内经济的快速发展,高速公路建设步伐不断加快,全国机动车辆、驾驶员数量迅速增长,交通管理工作日益繁重,压力与日俱增。为了提高公安交通管理工作的科学化、现代化水平,缓解警力不足,加强和保障道路交通的安全、有序和畅通,减少道路交通违法和事故的发生,全国各地建设和使用了大量的“电子警察”、“高清卡口”、“固定式测速”、“区间测速”、“便携式测速”、“视频监控”、“预警系统”、“能见度天气监测系统”、“LED信息发布系统”等交通监控系统设备。尽管修建了大量的交通设施,增加了诸多前端监控设备,但交通拥挤阻塞、交通安全状况仍然十分严重。由于道路上交通监测设备种类和生产厂家繁多,目前还没有一个统一的数据采集和交换标准,无法对所有的设备、数据进行统一、高效的管理和应用,造成各种设备和管理软件混用的局面,给使用单位带来了很多不便,使得国家大量的基础建设投资未达到预期的效果。各交警支队的设备大都采用本地的数据库管理,交警总队无法看到各支队的监测设备及监测信息,严重影响对全省交通监测的宏观管理;目前网络状况为设备专网、互联网、公安网并存的复杂情况,需要充分考虑公安网的安全性,同时要保证数据的集中式管理;监控数据需要与“六合一”平台、全国机动车稽查布控系统等的数据对接,迫切需要一个全盘考虑面向交警交通行业的智慧交通管控指挥平台系统。

智慧交通管控指挥平台建成后,达到了以下效果目标:

  • 交通监视和疏导:通过系统将监视区域内的现场图像传回指挥中心,使管理人员直接掌握车辆排队、堵塞、信号灯等交通状况,及时调整信号配时或通过其他手段来疏导交通,改变交通流的分布,以达到缓解交通堵塞的目的。
  • 交通警卫:通过突发事件的跟踪,提高处置突发事件的能力。
  • 建立公路事故、事件预警系统的指标体系及多类分析预警模型,实现对高速公路通行环境、交通运输对象、交通运输行为的综合分析和预警,建立真正意义上的分析及预警体系。
  • 及时准确地掌握所监视路口、路段周围的车辆、行人的流量、交通治安情况等,为指挥人员提供迅速直观的信息从而对交通事故和交通堵塞做出准确判断并及时响应。
  • 收集、处理各类公路网动静态交通安全信息,分析研判交通安全态势和事故隐患,并进行可视化展示和预警提示。
  • 提供接口与其他平台信息共享和关联应用,基于各类动静态信息的大数据分析处理,实现交通违法信息的互联互通、源头监管等功能。

1.1 项目架构

本项目是与公安交通管理综合应用平台、机动车缉查布控系统等对接的,并且基于交通部门现有的数据平台上,进行的数据实时分析项目。
在这里插入图片描述

  • 卡口:道路上用于监控的某个点,可能是十字路口,也可能是高速出口等。
    在这里插入图片描述

  • 通道:每个卡口上有多个摄像头,每个摄像头有拍摄的方向。这些摄像头也叫通道。

  • “违法王“车辆: 该车辆违法未处理超过50次以上的车。

  • 摄像头拍照识别:
    (1)一次拍照识别:经过卡口摄像头进行的识别,识别对象的车辆号牌信息、车辆号牌颜色信息等,基于车辆号牌和车辆颜色信息,能够实现基本的违法行为辨识、车辆黑白名单比对报警等功能。
    (2)二次拍照识别:可以通过时间差和距离自动计算出车辆的速度。

1.2 项目数据流

在这里插入图片描述
实时处理流程如下:
http请求 -->数据采集接口–>数据目录–> flume监控目录[监控的目录下的文件是按照日期分的] -->Kafka -->Flink分析数据 --> Mysql[实时监控数据保存]

1.3 项目主要模块

在这里插入图片描述

2. 项目数据字典

2.1 卡口车辆采集数据

卡口数据通过Flume采集过来之后存入Kafka中,其中数据的格式为:

(`action_time` long  --摄像头拍摄时间戳,精确到秒, `monitor_id` string  --卡口号, `camera_id` string   --摄像头编号, `car` string  --车牌号码, `speed` double  --通过卡扣的速度, `road_id` string  --道路id, `area_id` string  --区域id, 
)

其中每个字段之间使用逗号隔开。
区域ID代表:一个城市的行政区域。
摄像头编号:一个卡口往往会有多个摄像头,每个摄像头都有一个唯一编号。
道路ID:城市中每一条道路都有名字,比如:蔡锷路。交通部门会给蔡锷路一个唯一编号。

2.2 城市交通管理数据表

Mysql数据库中有两张表是由城市交通管理平台提供的,本项目需要读取这两张表的数据来进行分析计算。
(1)城市区域表: t_area_info

DROP TABLE IF EXISTS `t_area_info`;
CREATE TABLE `area_info` (`area_id` varchar(255) DEFAULT NULL,`area_name` varchar(255) DEFAULT NULL
)
--导入数据
INSERT INTO `t_area_info` VALUES ('01', '海淀区');
INSERT INTO `t_area_info` VALUES ('02', '昌平区');
INSERT INTO `t_area_info` VALUES ('03', '朝阳区');
INSERT INTO `t_area_info` VALUES ('04', '顺义区');
INSERT INTO `t_area_info` VALUES ('05', '西城区');
INSERT INTO `t_area_info` VALUES ('06', '东城区');
INSERT INTO `t_area_info` VALUES ('07', '大兴区');
INSERT INTO `t_area_info` VALUES ('08', '石景山');

(2)城市“违法”车辆列表:
城市“违法”车辆,一般是指需要进行实时布控的违法车辆。

DROP TABLE IF EXISTS `t_violation_list`;
CREATE TABLE `t_violation_list` (`id` int(11) NOT NULL AUTO_INCREMENT,`car` varchar(255) DEFAULT NULL,`violation` varchar(1000) DEFAULT NULL,`create_time` bigint(20) DEFAULT NULL,`detail` varchar(1000) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

(3)城市卡口限速信息表:
城市中有些卡口有限制设置,一般超过当前限速的10%要扣分。

DROP TABLE IF EXISTS `t_monitor_info`;
CREATE TABLE `t_monitor_info` (`area_id` varchar(255) DEFAULT NULL,`road_id` varchar(255) NOT NULL,`monitor_id` varchar(255) NOT NULL,`speed_limit` int(11) DEFAULT NULL,PRIMARY KEY (`area_id`,`road_id`,`monitor_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--导入数据
INSERT INTO `t_monitor_info` VALUES ('01','10','0000','60');
INSERT INTO `t_monitor_info` VALUES ('02','11','0001','60');
INSERT INTO `t_monitor_info` VALUES ('01','12','0002','80');
INSERT INTO `t_monitor_info` VALUES ('03','13','0003','100');

2.3 车辆轨迹数据表

在智能车辆布控模块中,需要保存一些车辆的实时行驶轨迹,为了方便其他部门和项目方便查询获取,我们在Mysql数据库设计一张车辆实时轨迹表。如果数据量太多,需要设置在HBase中。

DROP TABLE IF EXISTS `t_track_info`;
CREATE TABLE `t_track_info` (`id` int(11) NOT NULL AUTO_INCREMENT,`car` varchar(255) DEFAULT NULL,`action_time` bigint(20) DEFAULT NULL,`monitor_id` varchar(255) DEFAULT NULL,`road_id` varchar(255) DEFAULT NULL,`area_id` varchar(255) DEFAULT NULL,`speed` double DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3. 实时卡口监控分析

首先要实现的是实时卡口监控分析,由于前面课程项目中已经讲解了数据的ETL,本项目我们省略数据采集等ETL操作。我们将读取Kafka中的数据集来进行分析。
项目主体用Scala编写,采用IDEA作为开发环境进行项目编写,采用maven作为项目构建和管理工具。首先我们需要搭建项目框架。

3.1 创建Maven项目

打开IDEA,创建一个maven项目,我们整个项目需要的工具的不同版本可能会对程序运行造成影响,所以应该在porm.xml文件的最上面声明所有工具的版本信息。

在pom.xml中加入以下配置:

<properties><flink.version>1.9.1</flink.version><scala.binary.version>2.11</scala.binary.version><kafka.version>0.11.0.0</kafka.version>
</properties>

(1)添加项目依赖
对于整个项目而言,所有模块都会用到flink相关的组件,添加Flink相关组件依赖:

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId>       <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${scala.binary.version}</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.apache.flink</groupId>          <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.8.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
</dependencies>

(2)添加Scala和打包插件

<build>
<plugins><!-- 该插件用于将Scala代码编译成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><!-- 声明绑定到maven的compile阶段 --><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin>
</plugins>
</build>

3.2 准备数据

由于在前面的课程中已经学过数据的采集和ETL,本项目不再赘述,现在我们直接随机生成数据到文件中(方便测试),同时也写入Kafka。

项目中模拟车辆速度数据和车辆经过卡扣个数使用到了高斯分布,高斯分布就是正态分布。“正态分布”(Normal Distribution)可以描述所有常见的事物和现象:正常人群的身高、体重、考试成绩、家庭收入等等。这里的描述是什么意思呢?就是说这些指标背后的数据都会呈现一种中间密集、两边稀疏的特征。以身高为例,服从正态分布意味着大部分人的身高都会在人群的平均身高上下波动,特别矮和特别高的都比较少见,正态分布非常常见。
在这里插入图片描述
基于以上所以需要在pom.xml中导入高斯分布需要的依赖包:

<dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>3.6.1</version>
</dependency>

生成高斯标准分布的代码如下:

//获取随机数生成器
val generator: JDKRandomGenerator = new JDKRandomGenerator()
//随机生成高斯分布的数据
val grg: GaussianRandomGenerator = new GaussianRandomGenerator(generator)
//获取标准正态分布的数据
println(s"随机生成数据为:${grg.nextNormalizedDouble()}")

模拟生成数据的代码如下:

/*** 模拟生成数据,这里将数据生产到Kafka中,同时生成到文件中*/
object GeneratorData {def main(args: Array[String]): Unit = {//创建文件流val pw = new PrintWriter("./data/traffic_data")//创建Kafka 连接propertiesval props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")val random = new Random()//创建Kafka produerval producer = new KafkaProducer[String,String](props)//车牌号使用的地区val locations = Array[String]("京","津","京","鲁","京","京","冀","京","京","粤","京","京")//模拟车辆个数,这里假设每日有30万辆车信息for(i <- 1 to 30000){//模拟每辆车的车牌号,"%05d".format(100000) %05d,d代表数字,5d代表数字长度为5位,不足位数前面补0 。 例如:京A88888val car =locations(random.nextInt(12))+(65+random.nextInt(26)).toChar+"%05d".format(random.nextInt(100000))//模拟车辆经过的卡扣数,使用高斯分布,假设正常每辆车每日经过卡扣有30个val generator = new GaussianRandomGenerator(new JDKRandomGenerator())val monitorThreshold: Int = 1+(generator.nextNormalizedDouble()*30).abs.toInt //generator.nextNormalizedDouble() 处于-1 ~ 1 之间//模拟拍摄时间val day = DateUtils.getTodayDate()var hour = DateUtils.getHour()var flag = 0for(j <- 1 to monitorThreshold){flag+=1//模拟monitor_id ,4位长度val monitorId = "%04d".format(random.nextInt(9))//模拟camear_id ,5为长度val camearId = "%05d".format(random.nextInt(100000))//模拟road_id ,2为长度val roadId = "%02d".format(random.nextInt(50))//模拟area_id ,2为长度val areaId = "%02d".format(random.nextInt(8))//模拟速度 ,使用高斯分布,速度大多位于90 左右val speed = "%.1f".format(60 + (generator.nextNormalizedDouble()*30).abs)//模拟action_timeif(flag % 30 == 0 && flag != 0 ){hour = (hour.toInt+1).toString}val currentTime = day+" "+hour+":"+DateUtils.getMinutesOrSeconds()+":"+DateUtils.getMinutesOrSeconds()//获取action_time 时间戳val actionTime: Long = DateUtils.getTimeStamp(currentTime)var oneInfo = s"$actionTime,$monitorId,$camearId,$car,$speed,$roadId,$areaId"println(s"oneInfo = $oneInfo")//写入文件:pw.write(oneInfo)pw.println()//写入kafka:producer.send(new ProducerRecord[String,String]("traffic-topic",oneInfo))}}pw.flush()pw.close()producer.close()}
}

3.3 实时车辆超速监控

在城市交通管理数据库中,存储了每个卡口的限速信息,但是不是所有卡口都有限速信息,其中有一些卡口有限制。Flink中有广播状态流,JobManger统一管理,TaskManger中正在运行的Task不可以修改这个广播状态。只能定时更新(自定义Source)。

我们通过实时计算,需要把所有超速超过10%的车辆找出来,并写入关系型数据库中。超速结果表如下:

DROP TABLE IF EXISTS `t_speeding_info`;
CREATE TABLE `t_speeding_info` (`id` int(11) NOT NULL AUTO_INCREMENT,`car` varchar(255) NOT NULL,`monitor_id` varchar(255) DEFAULT NULL,`road_id` varchar(255) DEFAULT NULL,`real_speed` double DEFAULT NULL,`limit_speed` int(11) DEFAULT NULL,
`action_time` bigint(20) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

在当前需求中,需要不定时的从数据库表中查询所有限速的卡口,再根据限速的卡口列表来实时的判断是否存在超速的车辆,如果找到超速的车辆,把这些车辆超速的信息保存到Mysql数据库的超速违章记录表中t_speeding_info。

我们把查询限速卡口列表数据作为一个事件流,车辆通行日志数据作为第二个事件流。广播状态可以用于通过一个特定的方式来组合并共同处理两个事件流。第一个流的事件被广播到另一个operator的所有并发实例,这些事件将被保存为状态。另一个流的事件不会被广播,而是发送给同一个operator的各个实例,并与广播流的事件一起处理。广播状态非常适合两个流中一个吞吐大,一个吞吐小,或者需要动态修改处理逻辑的情况。

我们对两个流使用了connect()方法,并在连接之后调用BroadcastProcessFunction接口处理两个流:

  • processBroadcastElement()方法:每次收到广播流的记录时会调用。将接收到的卡口限速记录放入广播状态中;
  • processElement()方法:接受到车辆通行日志流的每条消息时会调用。并能够对广播状态进行只读操作,以防止导致跨越类中多个并发实例的不同广播状态的修改。

代码如下:

/***   监控超速的车辆信息*   思路:从mysql中读取卡扣下的限速信息,通过广播流进行广播,然后与从kafka中读取的车流量监控事件流进行connect处理*     广播状态操作步骤:*       1).读取广播流的DStream数据*       2).将以上DStream数据广播出去*       3).主流与广播流进行Connect关联,调用 process 底层API处理*       4).实现process方法中 BroadcastProcessFunction 类下的两个方法进行数据处理*/
object OutOfSpeedMonitor {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//导入隐式转换import org.apache.flink.streaming.api.scala._env.setParallelism(1)val props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.setProperty("group.id","testgroup1")props.setProperty("key.deserializer",classOf[StringDeserializer].getName)props.setProperty("value.deserializer",classOf[StringDeserializer].getName)props.setProperty("auto.offset.reset","latest")//读取Kafka中的监控车辆事件流val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
//    val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999).map(line => {val arr: Array[String] = line.split(",")TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))})//广播状态流 - 卡扣限速信息val broadCastStream: BroadcastStream[MonitorLimitSpeedInfo] = env.addSource(new JdbcReadSource("MonitorLimitSpeedInfo")).map(one => {one.asInstanceOf[MonitorLimitSpeedInfo]}).broadcast(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)val outOfSpeedCarInfoDStream: DataStream[OutOfSpeedCarInfo] = mainDStream.connect(broadCastStream).process(new BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, OutOfSpeedCarInfo] {//当有车辆监控事件时会被调用override def processElement(trafficLog: TrafficLog, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, OutOfSpeedCarInfo]#ReadOnlyContext, out: Collector[OutOfSpeedCarInfo]): Unit = {//道路_卡扣val roadMonitor = trafficLog.roadId+"_"+trafficLog.monitorIdval info: MonitorLimitSpeedInfo = ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR).get(roadMonitor)if (info != null) {//获取当前车辆真实的速度val realSpeed: Double = trafficLog.speed//获取当前卡扣限速信息val limitSpeed: Int = info.speedLimit//速度超过限速10% 就是超速车辆if (realSpeed > limitSpeed * 1.1) {out.collect(OutOfSpeedCarInfo(trafficLog.car, trafficLog.monitorId, trafficLog.roadId, realSpeed, limitSpeed, trafficLog.actionTime))}}}//每次收到广播流数据时,都会被调用,将接收到的卡扣限速记录放入到广播状态中override def processBroadcastElement(monitorLimitSpeedInfo: MonitorLimitSpeedInfo, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, OutOfSpeedCarInfo]#Context, out: Collector[OutOfSpeedCarInfo]): Unit = {val bcState: BroadcastState[String, MonitorLimitSpeedInfo] = ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)//key : 道路_卡扣 value :monitorLimitSpeedInfobcState.put(monitorLimitSpeedInfo.roadId+"_"+monitorLimitSpeedInfo.monitorId, monitorLimitSpeedInfo)}})//将超速车辆的结果保存到 mysql 表 t_speeding_info 中。val sink: JdbcWriteSink[OutOfSpeedCarInfo] = new JdbcWriteSink("OutOfSpeedCarInfo")outOfSpeedCarInfoDStream.addSink(sink)env.execute()}
}

3.4 实时卡口拥堵情况监控

卡口的实时拥堵情况,其实就是通过卡口的车辆平均车速,为了统计实时的平均车速,这里设定一个滑动窗口,窗口长度是为5分钟,滑动步长为1分钟。平均车速=当前窗口内通过车辆的车速之和 / 当前窗口内通过的车辆数量 ;并且在Flume采集数据的时候,我们发现数据可能出现时间乱序问题,最长迟到5秒。

实时卡口平均速度需要保存到Mysql数据库中,结果表设计为:

DROP TABLE IF EXISTS `t_average_speed`;
CREATE TABLE `t_average_speed` (`id` int(11) NOT NULL AUTO_INCREMENT,`start_time` bigint(20) DEFAULT NULL,`end_time` bigint(20) DEFAULT NULL,`monitor_id` varchar(255) DEFAULT NULL,`avg_speed` double DEFAULT NULL,`car_count` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

完整的代码:

object MonitorAvgSpeedMonitor {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.streaming.api.scala._val props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.setProperty("group.id","testgroup2")props.setProperty("key.deserializer",classOf[StringDeserializer].getName)props.setProperty("value.deserializer",classOf[StringDeserializer].getName)//使用时间为 事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设置线程为1env.setParallelism(1)//    val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props))val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999).map(line => {val arr: Array[String] = line.split(",")val actionTime = arr(0).toLongval monitorId = arr(1)val cameraId = arr(2)val car = arr(3)val speed = arr(4).toDoubleval roadId = arr(5)val areaId = arr(6)TrafficLog(actionTime, monitorId, cameraId, car, speed, roadId, areaId)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {override def extractTimestamp(element: TrafficLog): Long = element.actionTime})mainDStream.keyBy(_.monitorId).timeWindow(Time.minutes(5),Time.minutes(1))//统计每个卡扣通过车辆数,统计每个卡扣下的车辆总速度和,使用增量函数.aggregate(new AggregateFunction[TrafficLog,(Int,Double),(Int,Double)] {override def createAccumulator(): (Int, Double) = (0,0.0)override def add(value: TrafficLog, accumulator: (Int, Double)): (Int, Double) = (accumulator._1+1,accumulator._2+value.speed)override def getResult(accumulator: (Int, Double)): (Int, Double) = accumulatoroverride def merge(a: (Int, Double), b: (Int, Double)): (Int, Double) = (a._1+b._1,a._2+b._2)},new ProcessWindowFunction[(Int,Double),MonitorAvgSpeedInfo,String,TimeWindow] {override def process(key: String, context: Context, elements: Iterable[(Int, Double)], out: Collector[MonitorAvgSpeedInfo]): Unit = {val monitorId  = keyval avgSpeed = (elements.last._2/elements.last._1).formatted("%.2f").toDoubleout.collect(new MonitorAvgSpeedInfo(context.window.getStart,context.window.getEnd,monitorId,avgSpeed,elements.last._1))}}).addSink(new JdbcWriteSink[MonitorAvgSpeedInfo]("MonitorAvgSpeedInfo"))env.execute()}

3.5 实时最通畅的TopN卡口

所谓的最通畅的卡口,其实就是当时的车辆数量最少的卡口。这里有两种实现方式,一种是基于上一个功能的基础上再次开启第二个窗口操作,然后使用AllWindowFunction实现一个自定义的TopN函数Top来计算车速排名前3名的卡口,并将排名结果格式化成字符串,便于后续输出。另外一种是使用窗口函数,对滑动窗口内的数据全量计算并排序计算。

(1)基于上个功能基础上,完整的代码:

/***  基于 "实时卡扣拥堵情况业务" 基础之上进行统计*/
object FindTop5MonitorInfo2 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.streaming.api.scala._val props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.setProperty("group.id","testgroup2")props.setProperty("key.deserializer",classOf[StringDeserializer].getName)props.setProperty("value.deserializer",classOf[StringDeserializer].getName)//使用时间为 事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设置线程为1env.setParallelism(1)val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
//    val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999).map(line => {val arr: Array[String] = line.split(",")val actionTime = arr(0).toLongval monitorId = arr(1)val cameraId = arr(2)val car = arr(3)val speed = arr(4).toDoubleval roadId = arr(5)val areaId = arr(6)TrafficLog(actionTime, monitorId, cameraId, car, speed, roadId, areaId)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {override def extractTimestamp(element: TrafficLog): Long = element.actionTime})val monitorAvgSpeedDStream: DataStream[MonitorAvgSpeedInfo] = mainDStream.keyBy(_.monitorId).timeWindow(Time.minutes(5), Time.minutes(1))//统计每个卡扣通过车辆数,统计每个卡扣下的车辆总速度和,使用增量函数.aggregate(new AggregateFunction[TrafficLog, (Int, Double), (Int, Double)] {override def createAccumulator(): (Int, Double) = (0, 0.0)override def add(value: TrafficLog, accumulator: (Int, Double)): (Int, Double) = (accumulator._1 + 1, accumulator._2 + value.speed)override def getResult(accumulator: (Int, Double)): (Int, Double) = accumulatoroverride def merge(a: (Int, Double), b: (Int, Double)): (Int, Double) = (a._1 + b._1, a._2 + b._2)},new ProcessWindowFunction[(Int, Double), MonitorAvgSpeedInfo, String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[(Int, Double)], out: Collector[MonitorAvgSpeedInfo]): Unit = {val monitorId = keyval avgSpeed = (elements.last._2 / elements.last._1).formatted("%.2f").toDoubleout.collect(new MonitorAvgSpeedInfo(context.window.getStart, context.window.getEnd, monitorId, avgSpeed, elements.last._1))}}).assignAscendingTimestamps(masi => {masi.endTime})//设置下一个窗口的时间//这里设置一个滚动窗口,每隔1分钟,对以上所有卡扣对应的平均速度进行排序,得到对应的结果monitorAvgSpeedDStream.timeWindowAll(Time.minutes(1)).process(new ProcessAllWindowFunction[MonitorAvgSpeedInfo,String,TimeWindow] {override def process(context: Context, elements: Iterable[MonitorAvgSpeedInfo], out: Collector[String]): Unit = {val builder = new StringBuilder(s"窗口起始时间:${context.window.getStart} - ${context.window.getEnd},最拥堵的前3个卡扣信息如下:")val infoes: List[MonitorAvgSpeedInfo] = elements.toList.sortWith((masi1,masi2)=>{masi1.avgSpeed > masi2.avgSpeed}).take(3)for(masi <- infoes){builder.append(s"monitorId : ${masi.monitorId},avgSpeed : ${masi.avgSpeed} |")}out.collect(builder.toString())}}).print()env.execute()}
}

(2)滑动窗口全量计算:

object FindTop5MonitorInfo1 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//导入隐式转换import org.apache.flink.streaming.api.scala._//设置并行度为1env.setParallelism(1)//设置事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.setProperty("group.id","testgroup3")props.setProperty("key.deserializer",classOf[StringDeserializer].getName)props.setProperty("value.deserializer",classOf[StringDeserializer].getName)val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
//    val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999).map(line => {val arr: Array[String] = line.split(",")TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {override def extractTimestamp(element: TrafficLog): Long = element.actionTime})mainDStream.timeWindowAll(Time.minutes(1)).aggregate(//返回数据为 Map[String,Double] => Map[卡扣,平均速度]new AggregateFunction[TrafficLog,Map[String,(Int,Double)],Map[String,Double]]{//初始化一个Map[卡扣,(当前卡扣对应总车辆数,当前卡扣下所有车辆总速度和)]override def createAccumulator(): Map[String, (Int, Double)] = Map()override def add(value: TrafficLog, accMap: Map[String, (Int, Double)]): Map[String, (Int, Double)] = {//获取当前一条数据的monitorIDval monitorId: String = value.monitorIdif(accMap.contains(monitorId)){//当前map中包含此卡扣accMap.put(monitorId,(accMap.get(monitorId).get._1+1,accMap.get(monitorId).get._2+value.speed))}else{accMap.put(monitorId,(1,value.speed))}accMap}override def getResult(accumulator: Map[String,(Int, Double)]): Map[String, Double] = {accumulator.map(tp=>{val monitorId: String = tp._1val totalCarCount: Int = tp._2._1val totalSpeed: Double = tp._2._2(monitorId,(totalSpeed/totalCarCount).formatted("%.2f").toDouble)})}//合并不同线程处理的数据override def merge(a: Map[String, (Int, Double)], b: Map[String, (Int, Double)]): Map[String, (Int, Double)] = {b.foreach(tp=>{val monitorId: String = tp._1val carCount: Int = tp._2._1val totalSpeed: Double = tp._2._2if(a.contains(monitorId)){//第一个map中包含当前卡扣数据a.put(monitorId,(a.get(monitorId).get._1 + carCount,a.get(monitorId).get._2+totalSpeed))}else{//第一个map中不包含当前卡扣数据a.put(monitorId,tp._2)}})a}},new AllWindowFunction[Map[String, Double],String,TimeWindow] {override def apply(window: TimeWindow, input: scala.Iterable[mutable.Map[String, Double]], out: Collector[String]): Unit = {val tuples: List[(String, Double)] = input.last.toList.sortWith((tp1,tp2)=>{tp1._2 > tp2._2}).take(3)val returnStr = new StringBuilder(s"窗口起始时间:${window.getStart} - ${window.getEnd} ,最拥堵前3个卡扣信息 :")for(tp <- tuples){returnStr.append(s"monitorId = ${tp._1} ,avgSpeed = ${tp._2} |")}out.collect(returnStr.toString())}}).print()env.execute()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/226367.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

车路协同中 CUDA 鱼眼相机矫正、检测、追踪

在车路协同中,鱼眼一般用来补充杆件下方的盲区,需要实现目标检测、追踪、定位。在目标追踪任务中,通常的球机或者枪机方案,无法避免人群遮挡的问题,从而导致较高的ID Swich,造成追踪不稳定。但是鱼眼相机的顶视角安装方式,天然缓解了遮挡的问题,从而实现杆件下方的盲区…

51单片机(STC8)-- GPIO输入输出

文章目录 I/O口相关寄存器端口数据寄存器端口模式配置寄存器&#xff08;PxM0&#xff0c;PxM1&#xff09;端口上拉电阻控制寄存器(PxPU)关于I/O的注意事项 配置I/O口I/O设置demoI/O端口模式LED控制&#xff08;I/O输出&#xff09;按键检测&#xff08;I/O输入&#xff09; S…

SpringBoot多线程与任务调度总结

一、前言 多线程与任务调度是java开发中必须掌握的技能&#xff0c;在springBoot的开发中&#xff0c;多线程和任务调度变得越来越简单。实现方式可以通过实现ApplicationRunner接口&#xff0c;重新run的方法实现多线程。任务调度则可以使用Scheduled注解 二、使用示例 Slf…

vue3中使用defineComponent封装hook实现模板复用

文章目录 一、前言二、useTemplate 实现三、最后 一、前言 最近在做 Vue3 项目的时候&#xff0c;在思考一个小问题&#xff0c;其实是每个人都做过的一个场景&#xff0c;很简单&#xff0c;看下方代码 <template><div><div v-for"item in data" :…

Eclipse安装Jrebel eclipse免重启加载项目

每次修改JAVA文件都需要重新启动项目&#xff0c;加载时间太长&#xff0c;eclipse安装jrebel控件,避免重启项目节省时间。 1、Help->Eclipse Marketplace 2、搜索jrebel 3、Help->jrebel->Configuration 配置jrebel 4、激活jrebel 5、在红色框中填入 http://jrebel…

HCIA-Datacom题库(自己整理分类的)——OSPF协议判断

1.路由表中某条路由信息的Proto为OSPF则此路由的优先级一定为10。√ 2.如果网络管理员没有配置骨干区域,则路由器会自动创建骨干区域&#xff1f; 路由表中某条路由信息的Proto为OSPF&#xff0c;则此路由的优先级一定为10。 当两台OSPF路由器形成2-WAY邻居关系时&#xff0…

python-39-flask+nginx+Gunicorn的组合应用

flask nginx Gunicorn 王炸 1 flasknginxgunicornsupervisor 1.1 myapp.py from flask import Flask app Flask(__name__)app.route("/") def test_link():return "the link is very good"if __name__"__main__":app.run()默认是5000端口…

Java开发框架和中间件面试题(10)

目录 104.怎么保证缓存和数据库数据的一致性&#xff1f; 105.什么是缓存穿透&#xff0c;什么是缓存雪崩&#xff1f;怎么解决&#xff1f; 106.如何对数据库进行优化&#xff1f; 107.使用索引时有哪些原则&#xff1f; 108.存储过程如何进行优化&#xff1f; 109.说说…

听GPT 讲Rust源代码--src/tools(29)

File: rust/src/tools/clippy/clippy_lints/src/unused_peekable.rs 在Rust源代码中&#xff0c;rust/src/tools/clippy/clippy_lints/src/unused_peekable.rs这个文件是Clippy工具中一个特定的Lint规则的实现文件&#xff0c;用于检测未使用的Peekable迭代器。 Peekable迭代器…

[BUG] Hadoop-3.3.4集群yarn管理页面子队列不显示任务

1.问题描述 使用yarn调度任务时&#xff0c;在CapacityScheduler页面上单击叶队列&#xff08;或子队列&#xff09;时&#xff0c;不会显示应用程序任务信息&#xff0c;root队列可以显示任务。此外&#xff0c;FairScheduler页面是正常的。 No matching records found2.原…

Unreal Engine游戏引擎的优势

在现在这个繁荣的游戏开发行业中&#xff0c;选择合适的游戏引擎是非常重要的。其中&#xff0c;Unreal Engine作为一款功能强大的游戏引擎&#xff0c;在业界广受赞誉。那Unreal Engine游戏引擎究竟有哪些优势&#xff0c;带大家简单的了解一下。 图形渲染技术 Unreal Engin…

【计算机网络实验】educoder实验八 IPV6网络及其路由 头歌

第一关 IPV6网络基础 //千万不要破坏文档原有结构与内容&#xff01;&#xff01;&#xff01; //以下均为判断题&#xff0c;F&#xff1a;表示错误&#xff0c;T&#xff1a;表示正确 //答案必须写在相应行末尾括号内&#xff0c;F与T二选一&#xff0c;大写 // 1、ipv6协议…

Flink1.17实战教程(第七篇:Flink SQL)

系列文章目录 Flink1.17实战教程&#xff08;第一篇&#xff1a;概念、部署、架构&#xff09; Flink1.17实战教程&#xff08;第二篇&#xff1a;DataStream API&#xff09; Flink1.17实战教程&#xff08;第三篇&#xff1a;时间和窗口&#xff09; Flink1.17实战教程&…

Azure 学习总结

文章目录 1. Azure Function1.1 Azure Function 概念1.2 Azure Function 实现原理1.3 Azure Function 本地调试1.4 Azure Function 云部署 2. Azure API Managment 概念 以及使用2.1 Azure API 概念2.2 Azure API 基本使用 3. Service Bus 应用场景及相关特性3.1 Service Bus 基…

django之drf框架(排序、过滤、分页、异常处理)

排序 排序的快速使用 1.必须是继承GenericAPIView及其子类才能是用排序 导入OrderingFilter类&#xff0c;from rest_framework.filters import OrderingFilter 2.在类中配置类属性 filter_backends[OrderingFilter] 3.类中写属性 ordering_fields [price,id] # 必须是表的…

【论文阅读】Realtime multi-person 2d pose estimation using part affinity fields

OpenPose&#xff1a;使用PAF的实时多人2D姿势估计。 code&#xff1a;GitHub - ZheC/Realtime_Multi-Person_Pose_Estimation: Code repo for realtime multi-person pose estimation in CVPR17 (Oral) paper&#xff1a;[1611.08050] Realtime Multi-Person 2D Pose Estima…

Docker安装Grafana

1. 介绍 Grafana 是一个开源的度量分析和可视化工具&#xff0c;可以通过将采集的数据分析、查询&#xff0c;然后进行可视化的展示&#xff0c;并能实现报警。参考官网地址&#xff1a;Run Grafana Docker image | Grafana documentation 2. 安装Grafana (1) . 下载 命令&…

中北大学 软件构造 U+及上课代码详解

作业1 1.数据类型可分为两类:(原子类型) 、结构类型。 2.(数据结构)是计算机存储、组织数据的方式&#xff0c;是指相互之间存在一种或多种特定关系的数据元素的集合 3.代码重构指的是改变程序的(结构)而不改变其行为&#xff0c;以便提高代码的可读性、易修改性等。 4.软件实…

HCIA-Datacom题库(自己整理分类的)——OSPF协议多选

ospf的hello报文功能是 邻居发现 同步路由器的LSDB 更新LSA信息 维持邻居关系 下列关于OSPF区域描述正确的是 在配置OSPF区域正确必须给路由器的loopback接配置IP地址 所有的网络都应在区域0中宣告 骨干区域的编号不能为2 区域的编号范围是从0.0.0.0到255.255.255.255…

Python基础语法总结

1.每条语句结束不需要分号(也可以加上), 直接换行, 注意: 如果两行代码写一行, 则必须加分号. 2.定义变量不需要指定类型(如果需要写类型, 需要在变量名后面加": 类型, 这个写法只是方便读代码). 3.变量名大小写敏感. 4.查看变量类型: type(变量名). 5.Python中的int表…