配置网站域名/长沙网站建站模板
系列文章目录
Flink项目实战篇 基于Flink的城市交通监控平台(上)
Flink项目实战篇 基于Flink的城市交通监控平台(下)
文章目录
- 系列文章目录
- 1. 项目整体介绍
- 1.1 项目架构
- 1.2 项目数据流
- 1.3 项目主要模块
- 2. 项目数据字典
- 2.1 卡口车辆采集数据
- 2.2 城市交通管理数据表
- 2.3 车辆轨迹数据表
- 3. 实时卡口监控分析
- 3.1 创建Maven项目
- 3.2 准备数据
- 3.3 实时车辆超速监控
- 3.4 实时卡口拥堵情况监控
- 3.5 实时最通畅的TopN卡口
1. 项目整体介绍
近几年来,随着国内经济的快速发展,高速公路建设步伐不断加快,全国机动车辆、驾驶员数量迅速增长,交通管理工作日益繁重,压力与日俱增。为了提高公安交通管理工作的科学化、现代化水平,缓解警力不足,加强和保障道路交通的安全、有序和畅通,减少道路交通违法和事故的发生,全国各地建设和使用了大量的“电子警察”、“高清卡口”、“固定式测速”、“区间测速”、“便携式测速”、“视频监控”、“预警系统”、“能见度天气监测系统”、“LED信息发布系统”等交通监控系统设备。尽管修建了大量的交通设施,增加了诸多前端监控设备,但交通拥挤阻塞、交通安全状况仍然十分严重。由于道路上交通监测设备种类和生产厂家繁多,目前还没有一个统一的数据采集和交换标准,无法对所有的设备、数据进行统一、高效的管理和应用,造成各种设备和管理软件混用的局面,给使用单位带来了很多不便,使得国家大量的基础建设投资未达到预期的效果。各交警支队的设备大都采用本地的数据库管理,交警总队无法看到各支队的监测设备及监测信息,严重影响对全省交通监测的宏观管理;目前网络状况为设备专网、互联网、公安网并存的复杂情况,需要充分考虑公安网的安全性,同时要保证数据的集中式管理;监控数据需要与“六合一”平台、全国机动车稽查布控系统等的数据对接,迫切需要一个全盘考虑面向交警交通行业的智慧交通管控指挥平台系统。
智慧交通管控指挥平台建成后,达到了以下效果目标:
- 交通监视和疏导:通过系统将监视区域内的现场图像传回指挥中心,使管理人员直接掌握车辆排队、堵塞、信号灯等交通状况,及时调整信号配时或通过其他手段来疏导交通,改变交通流的分布,以达到缓解交通堵塞的目的。
- 交通警卫:通过突发事件的跟踪,提高处置突发事件的能力。
- 建立公路事故、事件预警系统的指标体系及多类分析预警模型,实现对高速公路通行环境、交通运输对象、交通运输行为的综合分析和预警,建立真正意义上的分析及预警体系。
- 及时准确地掌握所监视路口、路段周围的车辆、行人的流量、交通治安情况等,为指挥人员提供迅速直观的信息从而对交通事故和交通堵塞做出准确判断并及时响应。
- 收集、处理各类公路网动静态交通安全信息,分析研判交通安全态势和事故隐患,并进行可视化展示和预警提示。
- 提供接口与其他平台信息共享和关联应用,基于各类动静态信息的大数据分析处理,实现交通违法信息的互联互通、源头监管等功能。
1.1 项目架构
本项目是与公安交通管理综合应用平台、机动车缉查布控系统等对接的,并且基于交通部门现有的数据平台上,进行的数据实时分析项目。
-
卡口:道路上用于监控的某个点,可能是十字路口,也可能是高速出口等。
-
通道:每个卡口上有多个摄像头,每个摄像头有拍摄的方向。这些摄像头也叫通道。
-
“违法王“车辆: 该车辆违法未处理超过50次以上的车。
-
摄像头拍照识别:
(1)一次拍照识别:经过卡口摄像头进行的识别,识别对象的车辆号牌信息、车辆号牌颜色信息等,基于车辆号牌和车辆颜色信息,能够实现基本的违法行为辨识、车辆黑白名单比对报警等功能。
(2)二次拍照识别:可以通过时间差和距离自动计算出车辆的速度。
1.2 项目数据流
实时处理流程如下:
http请求 -->数据采集接口–>数据目录–> flume监控目录[监控的目录下的文件是按照日期分的] -->Kafka -->Flink分析数据 --> Mysql[实时监控数据保存]
1.3 项目主要模块
2. 项目数据字典
2.1 卡口车辆采集数据
卡口数据通过Flume采集过来之后存入Kafka中,其中数据的格式为:
(`action_time` long --摄像头拍摄时间戳,精确到秒, `monitor_id` string --卡口号, `camera_id` string --摄像头编号, `car` string --车牌号码, `speed` double --通过卡扣的速度, `road_id` string --道路id, `area_id` string --区域id,
)
其中每个字段之间使用逗号隔开。
区域ID代表:一个城市的行政区域。
摄像头编号:一个卡口往往会有多个摄像头,每个摄像头都有一个唯一编号。
道路ID:城市中每一条道路都有名字,比如:蔡锷路。交通部门会给蔡锷路一个唯一编号。
2.2 城市交通管理数据表
Mysql数据库中有两张表是由城市交通管理平台提供的,本项目需要读取这两张表的数据来进行分析计算。
(1)城市区域表: t_area_info
DROP TABLE IF EXISTS `t_area_info`;
CREATE TABLE `area_info` (`area_id` varchar(255) DEFAULT NULL,`area_name` varchar(255) DEFAULT NULL
)
--导入数据
INSERT INTO `t_area_info` VALUES ('01', '海淀区');
INSERT INTO `t_area_info` VALUES ('02', '昌平区');
INSERT INTO `t_area_info` VALUES ('03', '朝阳区');
INSERT INTO `t_area_info` VALUES ('04', '顺义区');
INSERT INTO `t_area_info` VALUES ('05', '西城区');
INSERT INTO `t_area_info` VALUES ('06', '东城区');
INSERT INTO `t_area_info` VALUES ('07', '大兴区');
INSERT INTO `t_area_info` VALUES ('08', '石景山');
(2)城市“违法”车辆列表:
城市“违法”车辆,一般是指需要进行实时布控的违法车辆。
DROP TABLE IF EXISTS `t_violation_list`;
CREATE TABLE `t_violation_list` (`id` int(11) NOT NULL AUTO_INCREMENT,`car` varchar(255) DEFAULT NULL,`violation` varchar(1000) DEFAULT NULL,`create_time` bigint(20) DEFAULT NULL,`detail` varchar(1000) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
(3)城市卡口限速信息表:
城市中有些卡口有限制设置,一般超过当前限速的10%要扣分。
DROP TABLE IF EXISTS `t_monitor_info`;
CREATE TABLE `t_monitor_info` (`area_id` varchar(255) DEFAULT NULL,`road_id` varchar(255) NOT NULL,`monitor_id` varchar(255) NOT NULL,`speed_limit` int(11) DEFAULT NULL,PRIMARY KEY (`area_id`,`road_id`,`monitor_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--导入数据
INSERT INTO `t_monitor_info` VALUES ('01','10','0000','60');
INSERT INTO `t_monitor_info` VALUES ('02','11','0001','60');
INSERT INTO `t_monitor_info` VALUES ('01','12','0002','80');
INSERT INTO `t_monitor_info` VALUES ('03','13','0003','100');
2.3 车辆轨迹数据表
在智能车辆布控模块中,需要保存一些车辆的实时行驶轨迹,为了方便其他部门和项目方便查询获取,我们在Mysql数据库设计一张车辆实时轨迹表。如果数据量太多,需要设置在HBase中。
DROP TABLE IF EXISTS `t_track_info`;
CREATE TABLE `t_track_info` (`id` int(11) NOT NULL AUTO_INCREMENT,`car` varchar(255) DEFAULT NULL,`action_time` bigint(20) DEFAULT NULL,`monitor_id` varchar(255) DEFAULT NULL,`road_id` varchar(255) DEFAULT NULL,`area_id` varchar(255) DEFAULT NULL,`speed` double DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3. 实时卡口监控分析
首先要实现的是实时卡口监控分析,由于前面课程项目中已经讲解了数据的ETL,本项目我们省略数据采集等ETL操作。我们将读取Kafka中的数据集来进行分析。
项目主体用Scala编写,采用IDEA作为开发环境进行项目编写,采用maven作为项目构建和管理工具。首先我们需要搭建项目框架。
3.1 创建Maven项目
打开IDEA,创建一个maven项目,我们整个项目需要的工具的不同版本可能会对程序运行造成影响,所以应该在porm.xml文件的最上面声明所有工具的版本信息。
在pom.xml中加入以下配置:
<properties><flink.version>1.9.1</flink.version><scala.binary.version>2.11</scala.binary.version><kafka.version>0.11.0.0</kafka.version>
</properties>
(1)添加项目依赖
对于整个项目而言,所有模块都会用到flink相关的组件,添加Flink相关组件依赖:
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${scala.binary.version}</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.8.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
</dependencies>
(2)添加Scala和打包插件
<build>
<plugins><!-- 该插件用于将Scala代码编译成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><!-- 声明绑定到maven的compile阶段 --><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin>
</plugins>
</build>
3.2 准备数据
由于在前面的课程中已经学过数据的采集和ETL,本项目不再赘述,现在我们直接随机生成数据到文件中(方便测试),同时也写入Kafka。
项目中模拟车辆速度数据和车辆经过卡扣个数使用到了高斯分布,高斯分布就是正态分布。“正态分布”(Normal Distribution)可以描述所有常见的事物和现象:正常人群的身高、体重、考试成绩、家庭收入等等。这里的描述是什么意思呢?就是说这些指标背后的数据都会呈现一种中间密集、两边稀疏的特征。以身高为例,服从正态分布意味着大部分人的身高都会在人群的平均身高上下波动,特别矮和特别高的都比较少见,正态分布非常常见。
基于以上所以需要在pom.xml中导入高斯分布需要的依赖包:
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>3.6.1</version>
</dependency>
生成高斯标准分布的代码如下:
//获取随机数生成器
val generator: JDKRandomGenerator = new JDKRandomGenerator()
//随机生成高斯分布的数据
val grg: GaussianRandomGenerator = new GaussianRandomGenerator(generator)
//获取标准正态分布的数据
println(s"随机生成数据为:${grg.nextNormalizedDouble()}")
模拟生成数据的代码如下:
/*** 模拟生成数据,这里将数据生产到Kafka中,同时生成到文件中*/
object GeneratorData {def main(args: Array[String]): Unit = {//创建文件流val pw = new PrintWriter("./data/traffic_data")//创建Kafka 连接propertiesval props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")val random = new Random()//创建Kafka produerval producer = new KafkaProducer[String,String](props)//车牌号使用的地区val locations = Array[String]("京","津","京","鲁","京","京","冀","京","京","粤","京","京")//模拟车辆个数,这里假设每日有30万辆车信息for(i <- 1 to 30000){//模拟每辆车的车牌号,"%05d".format(100000) %05d,d代表数字,5d代表数字长度为5位,不足位数前面补0 。 例如:京A88888val car =locations(random.nextInt(12))+(65+random.nextInt(26)).toChar+"%05d".format(random.nextInt(100000))//模拟车辆经过的卡扣数,使用高斯分布,假设正常每辆车每日经过卡扣有30个val generator = new GaussianRandomGenerator(new JDKRandomGenerator())val monitorThreshold: Int = 1+(generator.nextNormalizedDouble()*30).abs.toInt //generator.nextNormalizedDouble() 处于-1 ~ 1 之间//模拟拍摄时间val day = DateUtils.getTodayDate()var hour = DateUtils.getHour()var flag = 0for(j <- 1 to monitorThreshold){flag+=1//模拟monitor_id ,4位长度val monitorId = "%04d".format(random.nextInt(9))//模拟camear_id ,5为长度val camearId = "%05d".format(random.nextInt(100000))//模拟road_id ,2为长度val roadId = "%02d".format(random.nextInt(50))//模拟area_id ,2为长度val areaId = "%02d".format(random.nextInt(8))//模拟速度 ,使用高斯分布,速度大多位于90 左右val speed = "%.1f".format(60 + (generator.nextNormalizedDouble()*30).abs)//模拟action_timeif(flag % 30 == 0 && flag != 0 ){hour = (hour.toInt+1).toString}val currentTime = day+" "+hour+":"+DateUtils.getMinutesOrSeconds()+":"+DateUtils.getMinutesOrSeconds()//获取action_time 时间戳val actionTime: Long = DateUtils.getTimeStamp(currentTime)var oneInfo = s"$actionTime,$monitorId,$camearId,$car,$speed,$roadId,$areaId"println(s"oneInfo = $oneInfo")//写入文件:pw.write(oneInfo)pw.println()//写入kafka:producer.send(new ProducerRecord[String,String]("traffic-topic",oneInfo))}}pw.flush()pw.close()producer.close()}
}
3.3 实时车辆超速监控
在城市交通管理数据库中,存储了每个卡口的限速信息,但是不是所有卡口都有限速信息,其中有一些卡口有限制。Flink中有广播状态流,JobManger统一管理,TaskManger中正在运行的Task不可以修改这个广播状态。只能定时更新(自定义Source)。
我们通过实时计算,需要把所有超速超过10%的车辆找出来,并写入关系型数据库中。超速结果表如下:
DROP TABLE IF EXISTS `t_speeding_info`;
CREATE TABLE `t_speeding_info` (`id` int(11) NOT NULL AUTO_INCREMENT,`car` varchar(255) NOT NULL,`monitor_id` varchar(255) DEFAULT NULL,`road_id` varchar(255) DEFAULT NULL,`real_speed` double DEFAULT NULL,`limit_speed` int(11) DEFAULT NULL,
`action_time` bigint(20) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
在当前需求中,需要不定时的从数据库表中查询所有限速的卡口,再根据限速的卡口列表来实时的判断是否存在超速的车辆,如果找到超速的车辆,把这些车辆超速的信息保存到Mysql数据库的超速违章记录表中t_speeding_info。
我们把查询限速卡口列表数据作为一个事件流,车辆通行日志数据作为第二个事件流。广播状态可以用于通过一个特定的方式来组合并共同处理两个事件流。第一个流的事件被广播到另一个operator的所有并发实例,这些事件将被保存为状态。另一个流的事件不会被广播,而是发送给同一个operator的各个实例,并与广播流的事件一起处理。广播状态非常适合两个流中一个吞吐大,一个吞吐小,或者需要动态修改处理逻辑的情况。
我们对两个流使用了connect()方法,并在连接之后调用BroadcastProcessFunction接口处理两个流:
- processBroadcastElement()方法:每次收到广播流的记录时会调用。将接收到的卡口限速记录放入广播状态中;
- processElement()方法:接受到车辆通行日志流的每条消息时会调用。并能够对广播状态进行
只读
操作,以防止导致跨越类中多个并发实例的不同广播状态的修改。
代码如下:
/*** 监控超速的车辆信息* 思路:从mysql中读取卡扣下的限速信息,通过广播流进行广播,然后与从kafka中读取的车流量监控事件流进行connect处理* 广播状态操作步骤:* 1).读取广播流的DStream数据* 2).将以上DStream数据广播出去* 3).主流与广播流进行Connect关联,调用 process 底层API处理* 4).实现process方法中 BroadcastProcessFunction 类下的两个方法进行数据处理*/
object OutOfSpeedMonitor {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//导入隐式转换import org.apache.flink.streaming.api.scala._env.setParallelism(1)val props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.setProperty("group.id","testgroup1")props.setProperty("key.deserializer",classOf[StringDeserializer].getName)props.setProperty("value.deserializer",classOf[StringDeserializer].getName)props.setProperty("auto.offset.reset","latest")//读取Kafka中的监控车辆事件流val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
// val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999).map(line => {val arr: Array[String] = line.split(",")TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))})//广播状态流 - 卡扣限速信息val broadCastStream: BroadcastStream[MonitorLimitSpeedInfo] = env.addSource(new JdbcReadSource("MonitorLimitSpeedInfo")).map(one => {one.asInstanceOf[MonitorLimitSpeedInfo]}).broadcast(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)val outOfSpeedCarInfoDStream: DataStream[OutOfSpeedCarInfo] = mainDStream.connect(broadCastStream).process(new BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, OutOfSpeedCarInfo] {//当有车辆监控事件时会被调用override def processElement(trafficLog: TrafficLog, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, OutOfSpeedCarInfo]#ReadOnlyContext, out: Collector[OutOfSpeedCarInfo]): Unit = {//道路_卡扣val roadMonitor = trafficLog.roadId+"_"+trafficLog.monitorIdval info: MonitorLimitSpeedInfo = ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR).get(roadMonitor)if (info != null) {//获取当前车辆真实的速度val realSpeed: Double = trafficLog.speed//获取当前卡扣限速信息val limitSpeed: Int = info.speedLimit//速度超过限速10% 就是超速车辆if (realSpeed > limitSpeed * 1.1) {out.collect(OutOfSpeedCarInfo(trafficLog.car, trafficLog.monitorId, trafficLog.roadId, realSpeed, limitSpeed, trafficLog.actionTime))}}}//每次收到广播流数据时,都会被调用,将接收到的卡扣限速记录放入到广播状态中override def processBroadcastElement(monitorLimitSpeedInfo: MonitorLimitSpeedInfo, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, OutOfSpeedCarInfo]#Context, out: Collector[OutOfSpeedCarInfo]): Unit = {val bcState: BroadcastState[String, MonitorLimitSpeedInfo] = ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)//key : 道路_卡扣 value :monitorLimitSpeedInfobcState.put(monitorLimitSpeedInfo.roadId+"_"+monitorLimitSpeedInfo.monitorId, monitorLimitSpeedInfo)}})//将超速车辆的结果保存到 mysql 表 t_speeding_info 中。val sink: JdbcWriteSink[OutOfSpeedCarInfo] = new JdbcWriteSink("OutOfSpeedCarInfo")outOfSpeedCarInfoDStream.addSink(sink)env.execute()}
}
3.4 实时卡口拥堵情况监控
卡口的实时拥堵情况,其实就是通过卡口的车辆平均车速,为了统计实时的平均车速,这里设定一个滑动窗口,窗口长度是为5分钟,滑动步长为1分钟。平均车速=当前窗口内通过车辆的车速之和 / 当前窗口内通过的车辆数量 ;并且在Flume采集数据的时候,我们发现数据可能出现时间乱序问题,最长迟到5秒。
实时卡口平均速度需要保存到Mysql数据库中,结果表设计为:
DROP TABLE IF EXISTS `t_average_speed`;
CREATE TABLE `t_average_speed` (`id` int(11) NOT NULL AUTO_INCREMENT,`start_time` bigint(20) DEFAULT NULL,`end_time` bigint(20) DEFAULT NULL,`monitor_id` varchar(255) DEFAULT NULL,`avg_speed` double DEFAULT NULL,`car_count` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
完整的代码:
object MonitorAvgSpeedMonitor {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.streaming.api.scala._val props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.setProperty("group.id","testgroup2")props.setProperty("key.deserializer",classOf[StringDeserializer].getName)props.setProperty("value.deserializer",classOf[StringDeserializer].getName)//使用时间为 事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设置线程为1env.setParallelism(1)// val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props))val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999).map(line => {val arr: Array[String] = line.split(",")val actionTime = arr(0).toLongval monitorId = arr(1)val cameraId = arr(2)val car = arr(3)val speed = arr(4).toDoubleval roadId = arr(5)val areaId = arr(6)TrafficLog(actionTime, monitorId, cameraId, car, speed, roadId, areaId)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {override def extractTimestamp(element: TrafficLog): Long = element.actionTime})mainDStream.keyBy(_.monitorId).timeWindow(Time.minutes(5),Time.minutes(1))//统计每个卡扣通过车辆数,统计每个卡扣下的车辆总速度和,使用增量函数.aggregate(new AggregateFunction[TrafficLog,(Int,Double),(Int,Double)] {override def createAccumulator(): (Int, Double) = (0,0.0)override def add(value: TrafficLog, accumulator: (Int, Double)): (Int, Double) = (accumulator._1+1,accumulator._2+value.speed)override def getResult(accumulator: (Int, Double)): (Int, Double) = accumulatoroverride def merge(a: (Int, Double), b: (Int, Double)): (Int, Double) = (a._1+b._1,a._2+b._2)},new ProcessWindowFunction[(Int,Double),MonitorAvgSpeedInfo,String,TimeWindow] {override def process(key: String, context: Context, elements: Iterable[(Int, Double)], out: Collector[MonitorAvgSpeedInfo]): Unit = {val monitorId = keyval avgSpeed = (elements.last._2/elements.last._1).formatted("%.2f").toDoubleout.collect(new MonitorAvgSpeedInfo(context.window.getStart,context.window.getEnd,monitorId,avgSpeed,elements.last._1))}}).addSink(new JdbcWriteSink[MonitorAvgSpeedInfo]("MonitorAvgSpeedInfo"))env.execute()}
3.5 实时最通畅的TopN卡口
所谓的最通畅的卡口,其实就是当时的车辆数量最少的卡口。这里有两种实现方式,一种是基于上一个功能的基础上再次开启第二个窗口操作,然后使用AllWindowFunction实现一个自定义的TopN函数Top来计算车速排名前3名的卡口,并将排名结果格式化成字符串,便于后续输出。另外一种是使用窗口函数,对滑动窗口内的数据全量计算并排序计算。
(1)基于上个功能基础上,完整的代码:
/*** 基于 "实时卡扣拥堵情况业务" 基础之上进行统计*/
object FindTop5MonitorInfo2 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.streaming.api.scala._val props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.setProperty("group.id","testgroup2")props.setProperty("key.deserializer",classOf[StringDeserializer].getName)props.setProperty("value.deserializer",classOf[StringDeserializer].getName)//使用时间为 事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设置线程为1env.setParallelism(1)val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
// val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999).map(line => {val arr: Array[String] = line.split(",")val actionTime = arr(0).toLongval monitorId = arr(1)val cameraId = arr(2)val car = arr(3)val speed = arr(4).toDoubleval roadId = arr(5)val areaId = arr(6)TrafficLog(actionTime, monitorId, cameraId, car, speed, roadId, areaId)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {override def extractTimestamp(element: TrafficLog): Long = element.actionTime})val monitorAvgSpeedDStream: DataStream[MonitorAvgSpeedInfo] = mainDStream.keyBy(_.monitorId).timeWindow(Time.minutes(5), Time.minutes(1))//统计每个卡扣通过车辆数,统计每个卡扣下的车辆总速度和,使用增量函数.aggregate(new AggregateFunction[TrafficLog, (Int, Double), (Int, Double)] {override def createAccumulator(): (Int, Double) = (0, 0.0)override def add(value: TrafficLog, accumulator: (Int, Double)): (Int, Double) = (accumulator._1 + 1, accumulator._2 + value.speed)override def getResult(accumulator: (Int, Double)): (Int, Double) = accumulatoroverride def merge(a: (Int, Double), b: (Int, Double)): (Int, Double) = (a._1 + b._1, a._2 + b._2)},new ProcessWindowFunction[(Int, Double), MonitorAvgSpeedInfo, String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[(Int, Double)], out: Collector[MonitorAvgSpeedInfo]): Unit = {val monitorId = keyval avgSpeed = (elements.last._2 / elements.last._1).formatted("%.2f").toDoubleout.collect(new MonitorAvgSpeedInfo(context.window.getStart, context.window.getEnd, monitorId, avgSpeed, elements.last._1))}}).assignAscendingTimestamps(masi => {masi.endTime})//设置下一个窗口的时间//这里设置一个滚动窗口,每隔1分钟,对以上所有卡扣对应的平均速度进行排序,得到对应的结果monitorAvgSpeedDStream.timeWindowAll(Time.minutes(1)).process(new ProcessAllWindowFunction[MonitorAvgSpeedInfo,String,TimeWindow] {override def process(context: Context, elements: Iterable[MonitorAvgSpeedInfo], out: Collector[String]): Unit = {val builder = new StringBuilder(s"窗口起始时间:${context.window.getStart} - ${context.window.getEnd},最拥堵的前3个卡扣信息如下:")val infoes: List[MonitorAvgSpeedInfo] = elements.toList.sortWith((masi1,masi2)=>{masi1.avgSpeed > masi2.avgSpeed}).take(3)for(masi <- infoes){builder.append(s"monitorId : ${masi.monitorId},avgSpeed : ${masi.avgSpeed} |")}out.collect(builder.toString())}}).print()env.execute()}
}
(2)滑动窗口全量计算:
object FindTop5MonitorInfo1 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//导入隐式转换import org.apache.flink.streaming.api.scala._//设置并行度为1env.setParallelism(1)//设置事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.setProperty("group.id","testgroup3")props.setProperty("key.deserializer",classOf[StringDeserializer].getName)props.setProperty("value.deserializer",classOf[StringDeserializer].getName)val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
// val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999).map(line => {val arr: Array[String] = line.split(",")TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {override def extractTimestamp(element: TrafficLog): Long = element.actionTime})mainDStream.timeWindowAll(Time.minutes(1)).aggregate(//返回数据为 Map[String,Double] => Map[卡扣,平均速度]new AggregateFunction[TrafficLog,Map[String,(Int,Double)],Map[String,Double]]{//初始化一个Map[卡扣,(当前卡扣对应总车辆数,当前卡扣下所有车辆总速度和)]override def createAccumulator(): Map[String, (Int, Double)] = Map()override def add(value: TrafficLog, accMap: Map[String, (Int, Double)]): Map[String, (Int, Double)] = {//获取当前一条数据的monitorIDval monitorId: String = value.monitorIdif(accMap.contains(monitorId)){//当前map中包含此卡扣accMap.put(monitorId,(accMap.get(monitorId).get._1+1,accMap.get(monitorId).get._2+value.speed))}else{accMap.put(monitorId,(1,value.speed))}accMap}override def getResult(accumulator: Map[String,(Int, Double)]): Map[String, Double] = {accumulator.map(tp=>{val monitorId: String = tp._1val totalCarCount: Int = tp._2._1val totalSpeed: Double = tp._2._2(monitorId,(totalSpeed/totalCarCount).formatted("%.2f").toDouble)})}//合并不同线程处理的数据override def merge(a: Map[String, (Int, Double)], b: Map[String, (Int, Double)]): Map[String, (Int, Double)] = {b.foreach(tp=>{val monitorId: String = tp._1val carCount: Int = tp._2._1val totalSpeed: Double = tp._2._2if(a.contains(monitorId)){//第一个map中包含当前卡扣数据a.put(monitorId,(a.get(monitorId).get._1 + carCount,a.get(monitorId).get._2+totalSpeed))}else{//第一个map中不包含当前卡扣数据a.put(monitorId,tp._2)}})a}},new AllWindowFunction[Map[String, Double],String,TimeWindow] {override def apply(window: TimeWindow, input: scala.Iterable[mutable.Map[String, Double]], out: Collector[String]): Unit = {val tuples: List[(String, Double)] = input.last.toList.sortWith((tp1,tp2)=>{tp1._2 > tp2._2}).take(3)val returnStr = new StringBuilder(s"窗口起始时间:${window.getStart} - ${window.getEnd} ,最拥堵前3个卡扣信息 :")for(tp <- tuples){returnStr.append(s"monitorId = ${tp._1} ,avgSpeed = ${tp._2} |")}out.collect(returnStr.toString())}}).print()env.execute()
相关文章:

Flink项目实战篇 基于Flink的城市交通监控平台(上)
系列文章目录 Flink项目实战篇 基于Flink的城市交通监控平台(上) Flink项目实战篇 基于Flink的城市交通监控平台(下) 文章目录 系列文章目录1. 项目整体介绍1.1 项目架构1.2 项目数据流1.3 项目主要模块 2. 项目数据字典2.1 卡口…...

thinkcmf 文件包含 x1.6.0-x2.2.3 已亲自复现
thinkcmf 文件包含 x1.6.0-x2.2.3 CVE-2019-16278 已亲自复现 漏洞名称漏洞描述影响版本 漏洞复现环境搭建漏洞利用 修复建议总结 漏洞名称 漏洞描述 ThinkCMF是一款基于PHPMYSQL开发的中文内容管理框架,底层采用ThinkPHP3.2.3构建。ThinkCMF提出灵活的应用机制&a…...

本地部署 text-generation-webui
本地部署 text-generation-webui 0. 背景1. text-generation-webui 介绍2. 克隆代码3. 创建虚拟环境4. 安装 pytorch5. 安装 CUDA 运行时库6. 安装依赖库7. 启动 Web UI8. 访问 Web UI9. OpenAI 兼容 API 0. 背景 一直喜欢用 FastChat 本地部署大语言模型,今天试一…...

C语言实验1:C程序的运行环境和运行C程序的方法
一、算法原理 这是学C语言的入门,并不需要很高深的知识,一个hello world 或者一个简单的加法即可 二、实验要求 了解所用的计算机系统的基本操作方法,学会独立使用该系统。 了解在该系统上如何编辑、编译、连接和运行一个C程序。 通过运…...

「微服务」微服务架构中的数据一致性
在微服务中,一个逻辑上原子操作可以经常跨越多个微服务。即使是单片系统也可能使用多个数据库或消息传递解决方案。使用多个独立的数据存储解决方案,如果其中一个分布式流程参与者出现故障,我们就会面临数据不一致的风险 - 例如在未下订单的情…...

ARCGIS PRO SDK 要素空间关系
一、要素与要素查询,返回的是bool值 1、 Touches 判断几何要素是否接触 Touches 如果 geometry1 与 geometry2 接触,则返回 true,否则 false。 touches GeometryEngine.Instance.Touches(Geometry1, Geometry2) 2、…...

Python面向对象高级与Python的异常、模块以及包管理
Python面向对象高级与Python的异常、模块以及包管理 一、Python中的继承 1、什么是继承 我们接下来来聊聊Python代码中的“继承”:类是用来描述现实世界中同一组事务的共有特性的抽象模型,但是类也有上下级和范围之分,比如:生物 => 动物 => 哺乳动物 => 灵长型…...

Python 爬取 哔站视频弹幕 并实现词云图可视化
嗨喽,大家好呀~这里是爱看美女的茜茜呐 环境介绍: python 3.8 解释器 pycharm 编辑器 第三方模块: requests >>> pip install requests protobuf >>> pip install protobuf 如何安装python第三方模块: win R 输入 cmd 点击确定, 输入安装命…...

BP神经网络详细原理,BP神经网络训练界面详解,基于BP神经网络的公司财务风险分类
目录 摘要 BP神经网络参数设置及各种函数选择 参数设置 训练函数 传递函数 学习函数 性能函数 显示函数 前向网络创建函数 BP神经网络训练窗口详解 训练窗口例样 训练窗口四部详解 基于BP神经网络的公司财务风险分类 完整代码下载链接:基于BP神经网络的公司财务风险分类(代码…...

C++ DAY1 作业
1.定义自己的命名空间myspace,并在myspace中定义一个字符串,并实现求字符串长度 #include <iostream>using namespace std; namespace myspace {string str;int length_fun(){getline(cin,str);int i 0;while(str[i] ! \0){i;}return i;}} using…...

「微服务模式」七种微服务反模式
什么是微服务 流行语经常为进化的概念提供背景,并且需要一个良好的“标签”来促进对话。微服务是一个新的“标签”,它定义了我个人一直在发现和使用的领域。文章和会议描述了一些事情,我慢慢意识到,过去几年我一直在发展自己的个人…...

运动耳机哪款性价比最高、性价比最高的运动耳机推荐
近年来,运动蓝牙耳机备受欢迎,成为人们健身时的必备时尚单品。随着蓝牙耳机的不断发展,市场上可供选择的产品种类繁多,因此挑选一款适合自己的蓝牙耳机并不困难。然而,并非每款耳机都适合户外或者运动场景下的使用&…...

FreeRTOS软件定时器
一、简介 二、实验 //创建一个单次定时器和一个周期定时器,打开两个定时器然后等待10s关闭定时器,此时会发现单次定时器打印1次停止,周期定时器打印5次停止 #include "FreeRTOS_demo.h"#define START_TASK_PRIO 1 #define…...

【Java集合类不安全示例】
文章目录 一、List二、Set三、Map 提示:以下是本篇文章正文内容,下面案例可供参考 一、List 代码如下(示例): public class ZZZZZZZZZZ {public static void main(String[] args) {// ArrList 非线程安全的集合List&l…...

cpp_07_类型转换构造_析构函数_深拷贝_静态成员
1 类型转换构造函数 1.1 why? 基本类型之间的转换,编译器内置转换规则:int -> double 类类型之间的转换,编译器不知道转换规则,需要用户提供:Cat -> Dog // consconv_why.cpp 为什么需要自定义转换 #includ…...

Java 已死、前端已凉?
文章目录 Java 的现状前端技术的现状分析结论 关于“Java 已死、前端已凉”的言论,这种说法更多地反映了行业对技术趋势的一种情绪化反应,而不一定是基于事实的判断。下面我来具体分析这个话题。 Java 的现状 Java 的普及与稳定性:Java 作为一…...

Calico IP_AUTODETECTION_METHOD
在 Calico 中,IP_AUTODETECTION_METHOD 的配置项用于指定 Calico 如何检测容器的 IP 地址。 一、kubernetes-internal-ip模式 其中,kubernetes-internal-ip 是一种特殊的模式,用于在 Kubernetes 环境中检测容器的 IP 地址。具体作用如下&…...

百分点科技成为中国“数据要素×”生态合作伙伴
12月24日,由中国经济体制改革研究会、中国电子、郑州市人民政府、中国经济改革研究基金会联合主办的中国“数据要素”生态大会在郑州召开,百分点科技受邀出席,并获颁中国“数据要素x”2024年度生态伙伴合作证书。 大会邀请了国家数据局党组成…...

智能优化算法应用:基于孔雀算法3D无线传感器网络(WSN)覆盖优化 - 附代码
智能优化算法应用:基于孔雀算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于孔雀算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.孔雀算法4.实验参数设定5.算法结果6.参考文献7.MA…...

Redis自动部署脚本编写
#!/bin/bash ck_ok() { if [ $? -ne 0 ] then echo "$1 error." exit 1 fi } download_redis() { cd /usr/local/src if [ -f redis-7.0.4.tar.gz ] then echo "当前目录已经存在redis-7.0.4.tar.gz&q…...

Tinker 环境下数据表的用法
如果我们要自己手动创建一个模型文件,最简单的方式是通过 make:model 来创建。 php artisan make:model Article 删除模型文件 rm app/Models/Article.php 创建模型的同时顺便创建数据库迁移 php artisan make:model Article -m Eloquent 表命名约定 在该文件中&am…...

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之线性布局容器Row组件
鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之线性布局容器Row组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 二、Row组件 沿水平方向布局容器。 子组件 可以包含子组件。 接口 Row(…...

JAVA——JDBC学习
视频连接:https://www.bilibili.com/video/BV1sK411B71e/?spm_id_from333.337.search-card.all.click&vd_source619f8ed6df662d99db4b3673d1d3ddcb 《视频讲解很详细!!推荐》 JDBC(Java DataBase Connectivity Java数据库连…...

Flask 用户信息编辑系统
Flask 用户信息编辑系统 web/templates/user/edit.html {% extends "common/layout_main.html" %} {% block content %} {% include "common/tab_user.html" %} <div class"row m-t user_edit_wrap"><div class"col-lg-12"…...

Spring DefaultListableBeanFactory源码分析
目录 一、概述 二、主要功能 三、核心功能解析 Bean定义的存储结构 ConcurrentHashMap的使用和意义 四、总结 一、概述 DefaultListableBeanFactory 是 Spring 框架中的一个核心类,它继承自AbstractAutowireCapableBeanFactory类,实现了 ListableBeanF…...

关于MySQL、分布式系统、SpringCloud面试题
前言 之前为了准备面试,收集整理了一些面试题。 本篇文章更新时间2023年12月27日。 最新的内容可以看我的原文:https://www.yuque.com/wfzx/ninzck/cbf0cxkrr6s1kniv MySQL 索引 说一下有哪些锁? 行锁有哪些? 性能优化 分库分表…...

2023年中职“网络安全”——B-5:网络安全事件响应(Server2216)
B-5:网络安全事件响应 任务环境说明: 服务器场景:Server2216(开放链接) 用户名:root密码:123456 1、黑客通过网络攻入本地服务器,通过特殊手段在系统中建立了多个异常进程,找出启…...

【论文解读】Learning based fast H.264 to H.265 transcoding
时间: 2015 年 级别: APSIPA 机构: 上海电力大学 摘要 新提出的视频编码标准HEVC (High Efficiency video coding)以其比H.264/AVC更好的编码效率,被工业界和学术界广泛接受和采用。在HEVC实现了约40%的编码效率提升的同时&…...

[vue]Echart使用手册
[vue]Echart使用手册 使用环境Echart的使用Echart所有组件和图表类型Echart 使用方法 使用环境 之前是在JQuery阶段使用Echart,直接引入Echart的js文件即可,现在是在vue中使用,不仅仅时echarts包,还需要安装vue-echarts: "…...

视频人脸识别马赛克处理
文章目录 前言一、实现思路?二、Coding三、实现效果 前言 前面几篇文章我们尝试了使用opencv完成图像人脸识别以及识别后贴图或者打马赛克的方法。 偶尔我们也会有需求在视频中将人脸马赛克化,opencv也提供了相应的方法来实现这个功能。 一、实现思路&a…...