案例需求:
从kafka的topic-car中读取卡口数据,将超速车辆写入mysql的select * from t_speeding_info表,当通过卡口的车速超过60就认定为超速
卡口数据格式:
`action_time` long --摄像头拍摄时间戳,精确到秒, `monitor_id` string --卡口号, `camera_id` string --摄像头编号, `car` string --车牌号码, `speed` double --通过卡口的速度, `road_id` string --道路id, `area_id` string --区域id,
其中每个字段之间使用逗号隔开。
例如:1682219447,0001,1,豫DF09991,34.5,01,20
区域ID代表:一个城市的行政区域。
摄像头编号:一个卡口往往会有多个摄像头,每个摄像头都有一个唯一编号。
道路ID:城市中每一条道路都有名字,比如:航海路。交通部门会给航海路一个唯一编号。
MySQL建表语句
注意这个t_monitor_info是限速信息表
CREATE TABLE `t_monitor_info` (`monitor_id` varchar(255) NOT NULL, `road_id` varchar(255) NOT NULL,`speed_limit` int(11) DEFAULT NULL,`area_id` varchar(255) DEFAULT NULL,PRIMARY KEY (`monitor_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
限速信息:
超速表:
DROP TABLE IF EXISTS `t_speeding_info`;
CREATE TABLE `t_speeding_info` (`id` int(11) NOT NULL AUTO_INCREMENT,`car` varchar(255) NOT NULL,`monitor_id` varchar(255) DEFAULT NULL,`road_id` varchar(255) DEFAULT NULL,`real_speed` double DEFAULT NULL,`limit_speed` int(11) DEFAULT NULL,
`action_time` bigint(20) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
项目代码
该需求使用两个.java文件编写,分为项目代码和javaBean代码。
Test1_OutSpeedMonitor.java:
package day110612;import bean.MonitorInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import util.JdbcUtils;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;public class Test1_OutSpeedMonitor {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop10:9092");properties.setProperty("group.id", "car-group1");DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic-car",new SimpleStringSchema(),properties));SingleOutputStreamOperator<MonitorInfo> ds2 = ds1.map(new MapFunction<String, MonitorInfo>() {@Overridepublic MonitorInfo map(String value) throws Exception {String[] arr = value.split(",");return new MonitorInfo(Long.parseLong(arr[0]),arr[1],arr[2],arr[3],Double.parseDouble(arr[4]), arr[5], arr[6]);}});ds2.filter(new RichFilterFunction<MonitorInfo>() {Connection connection;PreparedStatement ps;ResultSet rs;@Overridepublic void open(Configuration parameters) throws Exception {connection = JdbcUtils.getconnection();ps = connection.prepareStatement("select speed_limit from t_monitor_info where monitor_id = ?");}@Overridepublic boolean filter(MonitorInfo value) throws Exception {ps.setString(1, value.getMonitorId());rs = ps.executeQuery();//如果t_monitor_info无法查询出该卡口的编号,则给定一个60的限速int speed_limit = 60;if (rs.next()) {speed_limit = rs.getInt("speed_limit");}value.setSpeedLimit(speed_limit);return value.getSpeed() > speed_limit * 1.1; //超速10%,判定为超速}@Overridepublic void close() throws Exception {JdbcUtils.release(rs, ps, connection);}}).addSink(JdbcSink.sink("insert into t_speeding_info values(null,?,?,?,?,?,?)",(PreparedStatement ps, MonitorInfo monitorInfo) -> {ps.setString(1, monitorInfo.getCar());ps.setString(2, monitorInfo.getMonitorId());ps.setString(3, monitorInfo.getRoadId());ps.setDouble(4, monitorInfo.getSpeed());ps.setInt(5, monitorInfo.getSpeedLimit());ps.setLong(6, monitorInfo.getActionTime());},JdbcExecutionOptions.builder().withBatchSize(1).withBatchIntervalMs(5000).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop10:3306/yangyulin?useSSL=false&useUnicode=true&characterEncoding=utf8").withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("0000").build()));env.execute();}
}
MonitorInfo.java:
package bean;//这是郭亚超的java豆
public class MonitorInfo {private Long actionTime;private String monitorId;private String cameraId;private String car;private Double speed; //车辆通过卡口的实际车速private String roadId;private String areaId;private Integer speedLimit; //卡口的限速public MonitorInfo() {}public MonitorInfo(Long actionTime, String monitorId, String cameraId, String car, Double speed, String roadId, String areaId) {this.actionTime = actionTime;this.monitorId = monitorId;this.cameraId = cameraId;this.car = car;this.speed = speed;this.roadId = roadId;this.areaId = areaId;}public MonitorInfo(Long actionTime, String monitorId, String cameraId, String car, Double speed, String roadId, String areaId, Integer speedLimit) {this.actionTime = actionTime;this.monitorId = monitorId;this.cameraId = cameraId;this.car = car;this.speed = speed;this.roadId = roadId;this.areaId = areaId;this.speedLimit = speedLimit;}/*** 获取** @return actionTime*/public Long getActionTime() {return actionTime;}/*** 设置** @param actionTime*/public void setActionTime(Long actionTime) {this.actionTime = actionTime;}/*** 获取** @return monitorId*/public String getMonitorId() {return monitorId;}/*** 设置** @param monitorId*/public void setMonitorId(String monitorId) {this.monitorId = monitorId;}/*** 获取** @return cameraId*/public String getCameraId() {return cameraId;}/*** 设置** @param cameraId*/public void setCameraId(String cameraId) {this.cameraId = cameraId;}/*** 获取** @return car*/public String getCar() {return car;}/*** 设置** @param car*/public void setCar(String car) {this.car = car;}/*** 获取** @return speed*/public Double getSpeed() {return speed;}/*** 设置** @param speed*/public void setSpeed(Double speed) {this.speed = speed;}/*** 获取** @return roadId*/public String getRoadId() {return roadId;}/*** 设置** @param roadId*/public void setRoadId(String roadId) {this.roadId = roadId;}/*** 获取** @return areaId*/public String getAreaId() {return areaId;}/*** 设置** @param areaId*/public void setAreaId(String areaId) {this.areaId = areaId;}/*** 获取** @return speedLimit*/public Integer getSpeedLimit() {return speedLimit;}/*** 设置** @param speedLimit*/public void setSpeedLimit(Integer speedLimit) {this.speedLimit = speedLimit;}public String toString() {return "MonitorInfo{actionTime = " + actionTime + ", monitorId = " + monitorId + ", cameraId = " + cameraId + ", car = " + car + ", speed = " + speed + ", roadId = " + roadId + ", areaId = " + areaId + ", speedLimit = " + speedLimit + "}";}
}
java bean的生成使用了Idea插件。
代码解释
这段代码实现了对车辆超速信息的实时监控和存储。代码解释如下:1. 导入所需的类和包。2. 创建`StreamExecutionEnvironment`实例。3. 创建Kafka的连接配置,并设置相关属性。4. 创建一个`FlinkKafkaConsumer`,用于从Kafka主题中接收数据流。5. 使用`map`函数将接收到的文本数据转换为`MonitorInfo`对象。6. 使用`filter`函数对超速的车辆进行过滤。- 在`open`方法中,建立与数据库的连接,并准备查询语句。- 在`filter`方法中,根据卡口ID查询对应的限速值,并将查询结果设置到`MonitorInfo`对象中。- 如果无法查询到限速值,则将限速值设置为默认值60。- 判断车辆的实际速度是否超过限速值的10%(超速10%判定为超速),返回布尔值。7. 在`addSink`中使用`JdbcSink.sink()`方法将超速的车辆信息写入到MySQL数据库。- 设置插入数据的SQL语句,使用占位符表示待填充的参数。- 使用lambda表达式定义参数填充逻辑,将`MonitorInfo`对象中的字段值设置到预编译语句中的对应位置。- 使用`JdbcExecutionOptions`设置批处理大小和间隔时间。- 使用`JdbcConnectionOptions`设置数据库连接信息。8. 调用`env.execute()`方法启动Flink程序的执行。总体来说,该代码通过从Kafka接收车辆数据流,并对超速的车辆进行监控和存储。在过滤阶段,根据卡口ID查询对应的限速值,并判断车辆的实际速度是否超过限速值的10%。超速的车辆信息将被写入到MySQL数据库中。
以上分析来自ChatGPT3.5,由我整理完善。
测试流程
测试数据
1686647522,0002,1,豫A99999,100.5,01,20
1686647522,0002,1,豫A99999,80.8,02,20
1686647522,0002,1,豫A99999,90.5,03,20
1686647522,0002,1,豫A99999,90.4,03,20
通过kafka发送
限速对照表
这里我写错了,当时road_id和area_id没有分清楚,应该是按照道路ID匹配限速,我当成了区域ID,但是不影响需求的实现流程。
超速车辆已经写入MySQL: