【基于Flink的城市交通实时监控平台】需求一:卡口车辆超速情况检测

案例需求:

从kafka的topic-car中读取卡口数据,将超速车辆写入mysql的select * from t_speeding_info表,当通过卡口的车速超过60就认定为超速

卡口数据格式:

 `action_time` long  --摄像头拍摄时间戳,精确到秒, `monitor_id` string  --卡口号, `camera_id` string   --摄像头编号, `car` string  --车牌号码, `speed` double  --通过卡口的速度, `road_id` string  --道路id, `area_id` string  --区域id, 

其中每个字段之间使用逗号隔开。
例如:1682219447,0001,1,豫DF09991,34.5,01,20
区域ID代表:一个城市的行政区域。
摄像头编号:一个卡口往往会有多个摄像头,每个摄像头都有一个唯一编号。
道路ID:城市中每一条道路都有名字,比如:航海路。交通部门会给航海路一个唯一编号。

MySQL建表语句

注意这个t_monitor_info是限速信息表

CREATE TABLE `t_monitor_info` (`monitor_id` varchar(255) NOT NULL,  `road_id` varchar(255) NOT NULL,`speed_limit` int(11) DEFAULT NULL,`area_id` varchar(255) DEFAULT NULL,PRIMARY KEY (`monitor_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

限速信息:
在这里插入图片描述
超速表:

DROP TABLE IF EXISTS `t_speeding_info`;
CREATE TABLE `t_speeding_info` (`id` int(11) NOT NULL AUTO_INCREMENT,`car` varchar(255) NOT NULL,`monitor_id` varchar(255) DEFAULT NULL,`road_id` varchar(255) DEFAULT NULL,`real_speed` double DEFAULT NULL,`limit_speed` int(11) DEFAULT NULL,
`action_time` bigint(20) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

项目代码

该需求使用两个.java文件编写,分为项目代码和javaBean代码。

Test1_OutSpeedMonitor.java:

package day110612;import bean.MonitorInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import util.JdbcUtils;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;public class Test1_OutSpeedMonitor {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop10:9092");properties.setProperty("group.id", "car-group1");DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic-car",new SimpleStringSchema(),properties));SingleOutputStreamOperator<MonitorInfo> ds2 = ds1.map(new MapFunction<String, MonitorInfo>() {@Overridepublic MonitorInfo map(String value) throws Exception {String[] arr = value.split(",");return new MonitorInfo(Long.parseLong(arr[0]),arr[1],arr[2],arr[3],Double.parseDouble(arr[4]), arr[5], arr[6]);}});ds2.filter(new RichFilterFunction<MonitorInfo>() {Connection connection;PreparedStatement ps;ResultSet rs;@Overridepublic void open(Configuration parameters) throws Exception {connection = JdbcUtils.getconnection();ps = connection.prepareStatement("select speed_limit from t_monitor_info where monitor_id = ?");}@Overridepublic boolean filter(MonitorInfo value) throws Exception {ps.setString(1, value.getMonitorId());rs = ps.executeQuery();//如果t_monitor_info无法查询出该卡口的编号,则给定一个60的限速int speed_limit = 60;if (rs.next()) {speed_limit = rs.getInt("speed_limit");}value.setSpeedLimit(speed_limit);return value.getSpeed() > speed_limit * 1.1; //超速10%,判定为超速}@Overridepublic void close() throws Exception {JdbcUtils.release(rs, ps, connection);}}).addSink(JdbcSink.sink("insert into t_speeding_info values(null,?,?,?,?,?,?)",(PreparedStatement ps, MonitorInfo monitorInfo) -> {ps.setString(1, monitorInfo.getCar());ps.setString(2, monitorInfo.getMonitorId());ps.setString(3, monitorInfo.getRoadId());ps.setDouble(4, monitorInfo.getSpeed());ps.setInt(5, monitorInfo.getSpeedLimit());ps.setLong(6, monitorInfo.getActionTime());},JdbcExecutionOptions.builder().withBatchSize(1).withBatchIntervalMs(5000).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop10:3306/yangyulin?useSSL=false&useUnicode=true&characterEncoding=utf8").withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("0000").build()));env.execute();}
}

MonitorInfo.java:

package bean;//这是郭亚超的java豆
public class MonitorInfo {private Long actionTime;private String monitorId;private String cameraId;private String car;private Double speed;  //车辆通过卡口的实际车速private String roadId;private String areaId;private Integer speedLimit;  //卡口的限速public MonitorInfo() {}public MonitorInfo(Long actionTime, String monitorId, String cameraId, String car, Double speed, String roadId, String areaId) {this.actionTime = actionTime;this.monitorId = monitorId;this.cameraId = cameraId;this.car = car;this.speed = speed;this.roadId = roadId;this.areaId = areaId;}public MonitorInfo(Long actionTime, String monitorId, String cameraId, String car, Double speed, String roadId, String areaId, Integer speedLimit) {this.actionTime = actionTime;this.monitorId = monitorId;this.cameraId = cameraId;this.car = car;this.speed = speed;this.roadId = roadId;this.areaId = areaId;this.speedLimit = speedLimit;}/*** 获取** @return actionTime*/public Long getActionTime() {return actionTime;}/*** 设置** @param actionTime*/public void setActionTime(Long actionTime) {this.actionTime = actionTime;}/*** 获取** @return monitorId*/public String getMonitorId() {return monitorId;}/*** 设置** @param monitorId*/public void setMonitorId(String monitorId) {this.monitorId = monitorId;}/*** 获取** @return cameraId*/public String getCameraId() {return cameraId;}/*** 设置** @param cameraId*/public void setCameraId(String cameraId) {this.cameraId = cameraId;}/*** 获取** @return car*/public String getCar() {return car;}/*** 设置** @param car*/public void setCar(String car) {this.car = car;}/*** 获取** @return speed*/public Double getSpeed() {return speed;}/*** 设置** @param speed*/public void setSpeed(Double speed) {this.speed = speed;}/*** 获取** @return roadId*/public String getRoadId() {return roadId;}/*** 设置** @param roadId*/public void setRoadId(String roadId) {this.roadId = roadId;}/*** 获取** @return areaId*/public String getAreaId() {return areaId;}/*** 设置** @param areaId*/public void setAreaId(String areaId) {this.areaId = areaId;}/*** 获取** @return speedLimit*/public Integer getSpeedLimit() {return speedLimit;}/*** 设置** @param speedLimit*/public void setSpeedLimit(Integer speedLimit) {this.speedLimit = speedLimit;}public String toString() {return "MonitorInfo{actionTime = " + actionTime + ", monitorId = " + monitorId + ", cameraId = " + cameraId + ", car = " + car + ", speed = " + speed + ", roadId = " + roadId + ", areaId = " + areaId + ", speedLimit = " + speedLimit + "}";}
}

java bean的生成使用了Idea插件。

代码解释

这段代码实现了对车辆超速信息的实时监控和存储。代码解释如下:1. 导入所需的类和包。2. 创建`StreamExecutionEnvironment`实例。3. 创建Kafka的连接配置,并设置相关属性。4. 创建一个`FlinkKafkaConsumer`,用于从Kafka主题中接收数据流。5. 使用`map`函数将接收到的文本数据转换为`MonitorInfo`对象。6. 使用`filter`函数对超速的车辆进行过滤。- 在`open`方法中,建立与数据库的连接,并准备查询语句。- 在`filter`方法中,根据卡口ID查询对应的限速值,并将查询结果设置到`MonitorInfo`对象中。- 如果无法查询到限速值,则将限速值设置为默认值60。- 判断车辆的实际速度是否超过限速值的10%(超速10%判定为超速),返回布尔值。7. 在`addSink`中使用`JdbcSink.sink()`方法将超速的车辆信息写入到MySQL数据库。- 设置插入数据的SQL语句,使用占位符表示待填充的参数。- 使用lambda表达式定义参数填充逻辑,将`MonitorInfo`对象中的字段值设置到预编译语句中的对应位置。- 使用`JdbcExecutionOptions`设置批处理大小和间隔时间。- 使用`JdbcConnectionOptions`设置数据库连接信息。8. 调用`env.execute()`方法启动Flink程序的执行。总体来说,该代码通过从Kafka接收车辆数据流,并对超速的车辆进行监控和存储。在过滤阶段,根据卡口ID查询对应的限速值,并判断车辆的实际速度是否超过限速值的10%。超速的车辆信息将被写入到MySQL数据库中。

以上分析来自ChatGPT3.5,由我整理完善。

测试流程

测试数据

1686647522,0002,1,豫A99999,100.5,01,20
1686647522,0002,1,豫A99999,80.8,02,20
1686647522,0002,1,豫A99999,90.5,03,20
1686647522,0002,1,豫A99999,90.4,03,20

通过kafka发送在这里插入图片描述

限速对照表

在这里插入图片描述
这里我写错了,当时road_id和area_id没有分清楚,应该是按照道路ID匹配限速,我当成了区域ID,但是不影响需求的实现流程。
超速车辆已经写入MySQL:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/5903.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

chatgpt赋能python:Python下载工具:提高工作效率的不二之选

Python下载工具&#xff1a;提高工作效率的不二之选 作为一名有10年Python编程经验的工程师&#xff0c;我深知一款好用的下载工具对于我们的工作效率有多么重要。因此&#xff0c;在众多Python工具中&#xff0c;我多次选用了一些好用的下载工具&#xff0c;并且对它们进行了…

将 ChatGLM2-6B 部署成 OpenAI API 服务

将 ChatGLM2-6B 部署成 OpenAI API 服务 0. 背景1. FastChat 部署使用 ChatGLM2-6B1-1. 创建虚拟环境1-2. 克隆代码1-3. 安装依赖库1-4. 使用 UI 进行推理1-5. 使用 OpenAI API 方式进行推理 0. 背景 最近一直在使用 OpenAI 的 API 做一些学习和调研。使用 OpenAI 的 API&…

chatgpt赋能Python-python_downloader

优秀Python下载器的重要性 在今天的数字化世界中&#xff0c;下载器是一个极其重要的工具。随着互联网速度的不断提升和存储设备的容量的增加&#xff0c;大量的数据和文件需要及时下载到本地计算机或存储设备中。许多编程语言都提供了相应的下载库&#xff0c;但Python是其中…

免费使用GPT-4的N种方法

很多朋友因为各种限制无法开通#ChatGPT Plus,而申请OpenAI的GPT-4 API也要慢慢排队(我的也还没下来)。于是在这里我搜集了X个可以免费使用的方法。 注:哪有什么真正免费,只不过有人在替你付钱。因此下述的方法都有限制,也有些可能会很快失效。新方法随时更新。 方法一: …

ChatGPT 拓展资料:ChatGPT插件系统上线 卷众生入局,燃天地斗气!

ChatGPT 拓展资料:ChatGPT插件系统上线 卷众生入局,燃天地斗气! ChatGPT 插件 我们已经在 ChatGPT 中实现了对插件的初步支持。插件是专门为以安全为核心原则的语言模型设计的工具,可帮助 ChatGPT 访问最新信息、运行计算或使用第三方服务。 根据我们的迭代部署理念,我们…

ChatGPT开始联网,最后的封印解除了

省时查报告-专业、及时、全面的行研报告库 省时查方案-专业、及时、全面的营销策划方案库 【免费下载】2023年2月份热门报告合集 最新亲测国内可用ChatGPT使用教程&#xff08;3分钟搞定&#xff09; 文心一言、GPT3.5及GPT4应用测评对比报告 ChatGPT团队背景研究报告 ChatGPT的…

chatgpt最大的竞争对手-claude

介绍 Claude是Anthropic公司开发的AI聊天机器人&#xff0c;与ChatGPT类似&#xff0c;由OpenAI前副总裁创办。和虽然比不上GPT4&#xff0c;但在连续对话能力、写小说、编写代码、解释概念等方面表现出色。 Claude是Anthropic公司开发的大语言模型(LLM)&#xff0c;主要特点…

巧用 ChatGPT,让开发者的学习和工作更轻松

引言 随着人工智能技术的快速发展和广泛应用&#xff0c;ChatGPT 作为一种新兴的自然语言处理模型&#xff0c;近期备受瞩目&#xff0c;引发了广泛讨论。 ChatGPT 具有多种应用场景&#xff0c;既可以用作聊天机器人&#xff0c;实现智能问答和自然语言交互&#xff0c;也可…

【promptulate专栏】使用ChatGPT和XMind快速构建思维导图

本文节选自笔者博客&#xff1a;https://www.blog.zeeland.cn/archives/ao302950h3j &#x1f496; 作者简介&#xff1a;大家好&#xff0c;我是Zeeland&#xff0c;全栈领域优质创作者。&#x1f4dd; CSDN主页&#xff1a;Zeeland&#x1f525;&#x1f4e3; 我的博客&#…

ChatGPT常用的指令(prompts)系列十——职业顾问、私人教练、心理健康顾问

系列文章目录 内容翻译自&#xff1a;https://github.com/f/awesome-chatgpt-prompts&#xff0c;并加入自己的实践内容 1、 ChatGPT常用的提示语&#xff08;prompts&#xff09;系列一 2、 ChatGPT常用的提示语&#xff08;prompts&#xff09;系列二 3、 ChatGPT常用的提示语…

ChatGPT为什么能够火出圈

最近ChatGPT可以说是火遍了全世界&#xff0c;作为由知名人工智能研究机构OpenAI于2022年11月30日发布的一个大型语言预训练模型&#xff0c;他的核心在于能够理解人类的自然语言&#xff0c;并使用贴近人类语言风格的方式来进行回复。模型开放使用以来&#xff0c;在人工智能领…

ChatGPT为什么会一本正经胡说八道?我们如何改进它?| 文内附有代码

ChatGPT为什么会一本正经胡说八道&#xff1f;我们如何改进它&#xff1f;| 文内附有代码 众所周知&#xff0c;在OpenAI平台上的ChatGPT模型目前有两大痛点&#xff1a;1. 它所学习的数据资料都是截止到2021年为止的&#xff0c;因此无法给出2022年之后的发生的事情。2. 有些时…

干货!150个chatgpt指令大全!chatGPT输出结果的质量高低,和你使用什么样质量的输入内容有关。

干货&#xff01;150个chatgpt指令大全 chatGPT输出结果的质量高低&#xff0c;和你使用什么样质量的输入内容有关。 在外网有大佬们已经整理出一些标准的问话模板&#xff0c;直接拿来使用后&#xff0c;效果极佳&#xff01; 把已经过验证的优质问法可以直接拿来用&#xf…

我用 ChatGPT 干的 18 件事!

&#x1f447;&#x1f447;关注后回复 “进群” &#xff0c;拉你进程序员交流群&#x1f447;&#x1f447; 来自&#xff1a;CSDN&#xff0c;作者&#xff1a;ㄣ知冷煖★ 原文链接&#xff1a; https://blog.csdn.net/weixin_42475060/article/details/129399125 版权声明…

[Golang实战]如何快速接入chatgpt/openai?[引入go-gpt3][新手开箱可用]

如何快速接入chatgpt?[引入go-gpt3] 上文介绍了如何在网页使用chatgpt?V1.介绍下在golang中使用chatgpt?1.查看官网推荐的chatgpt项目2.访问go-gpt33.使用并运行在自己的项目中...(是因为例子很难理解,所以一一对应了属性做了配置)3.1安装项目3.2换上自己的代码3.3换上自己的…

用ChatGPT操控机器人,微软开启居家机器人新纪元!

编&#xff5c;LRS 源&#xff5c;新智元 ChatGPT不止会动嘴&#xff0c;还能帮你操控无人机&#xff01; 虽然ChatGPT已经被调教为符合人类的偏好&#xff0c;但在各种反向操作下&#xff0c;还是能够逼问出一些「不道德的内容」&#xff0c;比如ChatGPT可以给你列一份详细的…

ChatGPT:微软人工智能Office和电邮即将登场...

PS&#xff1a;欢迎大家关注我的Twitter&#xff1a;Alphatu4 &#xff08;深夜写稿不易&#xff0c;会有很多独家内容&#xff09; 欢迎点击在看、转发~谢谢大家&#xff01;&#x1f407; *转载请扫码添加后台微信二维码&#xff0c;转载请注明来源&#xff0c;且附上本文的…

联网、多模态版ChatGPT?微软BingChat评测,New Bing竟然是个大美女?

微软开放了New Bing&#xff0c;大家都可以注册使用了&#xff0c;详细见我的文章 北方的郎&#xff1a;微软放大招&#xff0c;所有人都能用New Bing了 今天把它的功能简单评测一下&#xff0c;首先如果想要体验多模态&#xff0c;要选择更有创造力选项。 首先让它画一张小猫…

ChatGPT淘汰程序员?不可能的!看代码生成机器人如何让我更强。

AIGC让程序员失业&#xff1f;不存在的&#xff01;聆思开发聊天助手Chaty让你更高效地开发CSK芯片代码、更自在地摸鱼&#xff01; 当遇上十万火急&#xff0c;产品提完需求马上要怎么办&#xff1f; 且看Chaty如何破局 以往流程&#xff1a; Chaty加持&#xff1a; Chat…

周鸿祎称搭不上ChatGPT企业会被淘汰;马斯克会议现场解雇推特高级工程师;同时应聘十几个工作靠裁员补偿年入千万 | EA周报...

EA周报 2023年2月10日 每个星期1分钟&#xff0c;元宝带你喝一杯IT人的浓缩咖啡&#xff0c;了解天下事、掌握IT核心技术。 周报看点 1、周鸿祎谈 ChatGPT&#xff1a;搭不上这班车的企业会被淘汰 2、马斯克被爆当场解雇推特工程师 原因竟是人气下降 3、苏宁张康阳遭建行全球追…