视频分析系统
业务流程
原始数据
vedio.json
{"rank":1,"title":"《逃出大英博物馆》第二集","dzl":"77.8","bfl":"523.9","zfl":"39000","type":"影视","time":"3-5"}
{"rank":2,"title":"兄弟们这一期真的要看到最后!","dzl":"89.6","bfl":"636.1","zfl":"4246","type":"搞笑","time":"3-5"}
{"rank":3,"title":"全世界学生都痛恨的一种教育方式","dzl":"27.3","bfl":"313.6","zfl":"13000","type":"搞笑","time":"5-8"}
{"rank":4,"title":"这是我玩过最抽象的宝可梦了3","dzl":"29.7","bfl":"193.6","zfl":"4631","type":"搞笑","time":"10+"}
{"rank":5,"title":"我们又又又改造公司了","dzl":"25.0","bfl":"269.7","zfl":"3084","type":"搞笑","time":"10+"}
{"rank":6,"title":"我在尝试当一种很新的美食博主","dzl":"48.0","bfl":"258.7","zfl":"6596","type":"美食","time":"0-3"}
{"rank":100,"title":"很听劝我和男装的自己搞在一起","dzl":"18.7","bfl":"87.8","zfl":"4077","type":"搞笑","time":"0-3"}
{"rank":99,"title":"探秘中国最贵的面!70只大闸蟹做一碗面!到底什么味道?","dzl":"8.8","bfl":"88.2","zfl":"4527","type":"美食","time":"5-8"}
{"rank":7,"title":"土布","dzl":"26.4","bfl":"224.8","zfl":"3291","type":"生活","time":"5-8"}
{"rank":34,"title":"我的工作周报vs我实际的工作","dzl":"18.2","bfl":"162.6","zfl":"21000","type":"生活","time":"3-5"}
{"rank":8,"title":"麒麟9000S性能分析:华为Mate60 Pro能效如何?","dzl":"18.7","bfl":"151.7","zfl":"34000","type":"知识","time":"10+"}
{"rank":98,"title":"Time Vent!十秒带你穿越2023-2009!感受令和与新平成假面骑士的力量叭!","dzl":"14.4","bfl":"124.3","zfl":"6918","type":"动画","time":"0-3"}
{"rank":35,"title":"魔术师的三个阶段","dzl":"12.2","bfl":"180.0","zfl":"3841","type":"影视","time":"3-5"}
{"rank":9,"title":"高斯一败涂地","dzl":"21.3","bfl":"181.3","zfl":"3424","type":"搞笑","time":"10+"}
{"rank":97,"title":"千匹之战!中国车VS特斯拉!","dzl":"7.2","bfl":"117.4","zfl":"15000","type":"知识","time":"10+"}
{"rank":96,"title":"真实事件改编没想到一个综艺节目这么敢拍孩子需要被改造吗?","dzl":"11.2","bfl":"441.4","zfl":"1640","type":"影视","time":"5-8"}
{"rank":36,"title":"哈哈哈哈哈谁是卧底音乐版","dzl":"14.2","bfl":"232.9","zfl":"16000","type":"生活","time":"0-3"}
{"rank":10,"title":"偷子","dzl":"58.1","bfl":"699.6","zfl":"20000","type":"搞笑","time":"10+"}
{"rank":37,"title":"感谢还有读书这条路能让我摆脱这样的原生家庭","dzl":"11.2","bfl":"162.4","zfl":"13000","type":"生活","time":"10+"}
{"rank":11,"title":"“当 代 热 门 游 戏 现 状”","dzl":"26.3","bfl":"205.0","zfl":"2511","type":"游戏","time":"0-3"}
{"rank":95,"title":"欧洲资本积累到底有多肮脏?揭秘罪恶的黑奴贸易历史书为何只字不提?","dzl":"8.1","bfl":"77.8","zfl":"3752","type":"生活","time":"10+"}
{"rank":38,"title":"永不停息的制裁华为能否王者归来?","dzl":"16.0","bfl":"128.1","zfl":"3785","type":"知识","time":"8-10"}
{"rank":12,"title":"章鱼哥疯了","dzl":"10.2","bfl":"425.1","zfl":"1550","type":"游戏","time":"10+"}
{"rank":13,"title":"当你以游戏的方式打开军训","dzl":"18.6","bfl":"146.8","zfl":"11000","type":"生活","time":"8-10"}
{"rank":14,"title":"这下终于听懂秀才的小曲了","dzl":"17.7","bfl":"233.5","zfl":"9093","type":"搞笑","time":"0-3"}
{"rank":15,"title":"【毕导】这个视频里说的都是真的但你却永远无法证明","dzl":"32.3","bfl":"436.4","zfl":"35000","type":"知识","time":"10+"}
{"rank":16,"title":"【冰冰vlog.011】总要在烟雨青天去趟江南吧","dzl":"15.7","bfl":"150.3","zfl":"15000","type":"生活","time":"8-10"}
{"rank":17,"title":"【深度科普】长期摆烂如何恢复体能?","dzl":"9.9","bfl":"135.9","zfl":"12000","type":"运动","time":"5-8"}
{"rank":18,"title":"恋与提瓦特空桃篇:谁让你是我命定的另一半","dzl":"16.3","bfl":"122.1","zfl":"9901","type":"动画","time":"3-5"}
{"rank":19,"title":"帮唐山一位身残志坚的大姐卖冰激凌","dzl":"19.6","bfl":"134.2","zfl":"2845","type":"生活","time":"3-5"}
{"rank":20,"title":"350元一桶巨型泡面!新晋“天价网红泡面”值不值???","dzl":"13.5","bfl":"270.4","zfl":"1517","type":"美食","time":"10+"}
{"rank":21,"title":"深夜抢救我爸被喷一脸血怕得病猛搓脸找药吃","dzl":"21.1","bfl":"239.2","zfl":"1103","type":"生活","time":"5-8"}
{"rank":22,"title":"新铁锅怎么开锅更好用酒店大厨教你诀窍保证不锈不粘锅","dzl":"22.2","bfl":"425.3","zfl":"7099","type":"美食","time":"0-3"}
{"rank":94,"title":"九小时畅享听完一整本书不是一剪梅!书荒丨小说","dzl":"2.0","bfl":"119.8","zfl":"88","type":"动画","time":"10+"}
{"rank":23,"title":"潮汕大妈在北美说唱圈里的生意经","dzl":"21.9","bfl":"417.8","zfl":"1106","type":"生活","time":"3-5"}
{"rank":93,"title":"再次出发","dzl":"13.7","bfl":"104.6","zfl":"3630","type":"知识","time":"0-3"}
{"rank":24,"title":"万人雪书的长视频终于来啦!架不住你们天天催啊花了好多时间呜呜呜~","dzl":"25.6","bfl":"376.5","zfl":"4181","type":"生活","time":"8-10"}
{"rank":92,"title":"2023年8月热歌榜TOP50今夕是何年?难以置信这是2023年?","dzl":"6.7","bfl":"92.1","zfl":"3226","type":"音乐","time":"10+"}
{"rank":25,"title":"《明日方舟》SideStory「不义之财」活动宣传PV","dzl":"14.1","bfl":"172.7","zfl":"36000","type":"游戏","time":"5-8"}
{"rank":26,"title":"当中二青年来买单!(猜不到结局版)","dzl":"22.3","bfl":"235.5","zfl":"18000","type":"影视","time":"3-5"}
{"rank":91,"title":"日本排海为何中国网友互喷?日本政府是如何正当化排海的?","dzl":"6.4","bfl":"70.7","zfl":"7570","type":"知识","time":"10+"}
{"rank":90,"title":"她似了又活了。她没似他似了所以她也似了。她又活了可他们似了所以她又似了。","dzl":"8.0","bfl":"46.5","zfl":"7960","type":"搞笑","time":"3-5"}
{"rank":28,"title":"宿舍规则怪谈","dzl":"27.1","bfl":"205.1","zfl":"5178","type":"搞笑","time":"10+"}
{"rank":89,"title":"光头强家里捡的","dzl":"0.4","bfl":"320.9","zfl":"14","type":"动画","time":"8-10"}
{"rank":88,"title":"全网首拆!麒麟 5G 确认回归 | 华为 Mate 60 Pro","dzl":"56.2","bfl":"703.5","zfl":"90000","type":"知识","time":"10+"}
{"rank":29,"title":"穷小子强行与富少爷交换人生本想荣华富贵哪知少爷家里更是离谱!","dzl":"17.7","bfl":"288.5","zfl":"2355","type":"动画","time":"3-5"}
{"rank":87,"title":"我精神状态完全没有出问题的啦!!!","dzl":"16.4","bfl":"127.9","zfl":"4622","type":"动画","time":"0-3"}
{"rank":30,"title":"一个疯子却获得了诺贝尔奖真实故事改编高分电影《美丽心灵》","dzl":"12.5","bfl":"329.1","zfl":"3109","type":"影视","time":"10+"}
{"rank":86,"title":"最诡异的一局","dzl":"11","bfl":"101.4","zfl":"1702","type":"游戏","time":"3-5"}
{"rank":85,"title":"拥有几十台能上路的经典老车是什么样的体验?","dzl":"9.8","bfl":"163.4","zfl":"1669","type":"知识","time":"3-5"}
{"rank":31,"title":"这款游戏出现在21世纪还是太迷幻了","dzl":"12.5","bfl":"114.6","zfl":"1791","type":"游戏","time":"10+"}
{"rank":84,"title":"史上最好的原神模组!!(boss篇)","dzl":"8.7","bfl":"104.8","zfl":"8998","type":"游戏","time":"5-8"}
{"rank":32,"title":"关于下半身的生理知识没人告诉你这些!|拉撒保姆级教程","dzl":"10.9","bfl":"100.1","zfl":"7921","type":"知识","time":"10+"}
{"rank":83,"title":"努力有错吗?学习有罪吗?高考没有一个韩国人敢退出的战争","dzl":"8.2","bfl":"168.7","zfl":"9989","type":"知识","time":"10+"}
{"rank":33,"title":"宝可梦日语OP大全【挪威小哥Pellek|中字】","dzl":"11.7","bfl":"77.4","zfl":"3124","type":"音乐","time":"8-10"}
{"rank":82,"title":"不同年代的上班族白领都是怎么办公的?最后真的别演我工作状态哈哈哈哈!","dzl":"15.6","bfl":"147.5","zfl":"1688","type":"生活","time":"0-3"}
{"rank":81,"title":"我们分手了","dzl":"8.1","bfl":"241.4","zfl":"7008","type":"运动","time":"5-8"}
{"rank":39,"title":"老鼠们每天用捕鼠夹健身只为发动鼠界第三次世界大战","dzl":"19.2","bfl":"627.6","zfl":"10000","type":"影视","time":"10+"}
{"rank":40,"title":"大意了!这些不是全国统一的","dzl":"17.0","bfl":"219.7","zfl":"1325","type":"生活","time":"5-8"}
{"rank":41,"title":"青春没有售价米哈游贴脸开大","dzl":"28.6","bfl":"147.1","zfl":"29000","type":"游戏","time":"0-3"}
{"rank":42,"title":"早自习补作业事件","dzl":"29.0","bfl":"331.6","zfl":"5866","type":"影视","time":"0-3"}
{"rank":43,"title":"凌晨12.00教100个仙人掌唱歌没想到邻居找上门来了","dzl":"10.7","bfl":"330.0","zfl":"11000","type":"生活","time":"3-5"}
{"rank":44,"title":"这一次我们重新定义砖块","dzl":"24.7","bfl":"309.7","zfl":"5792","type":"生活","time":"0-3"}
{"rank":45,"title":"抗塔两分钟还是满血我没开玩笑!","dzl":"12.9","bfl":"279.4","zfl":"2197","type":"游戏","time":"5-8"}
{"rank":67,"title":"世界纪录保持者的拍摄间该怎么提升?","dzl":"9.3","bfl":"117.2","zfl":"2266","type":"知识","time":"10+"}
{"rank":68,"title":"一口气看完海贼王真人版第1季!1-8集!真人版符合你的预期吗?","dzl":"4.0","bfl":"238.6","zfl":"11000","type":"影视","time":"10+"}
{"rank":69,"title":"叶问之摆烂宗师","dzl":"9.2","bfl":"139.3","zfl":"10000","type":"搞笑","time":"5-8"}
{"rank":70,"title":"甲方花钱助我出道","dzl":"10.5","bfl":"78.4","zfl":"4665","type":"生活","time":"3-5"}
{"rank":71,"title":"秀 才 小 曲 最 骚 版 本","dzl":"12.9","bfl":"144.9","zfl":"3663","type":"搞笑","time":"0-3"}
{"rank":72,"title":"还原一下著名的《潘博文事件》","dzl":"21.8","bfl":"499.1","zfl":"5908","type":"生活","time":"3-5"}
{"rank":73,"title":"手持烟火以谋生 心怀诗意以谋爱","dzl":"15.3","bfl":"120.8","zfl":"5448","type":"动画","time":"0-3"}
{"rank":74,"title":"大家散伙!唐僧闹分手悟空为何大开杀戒?","dzl":"6.8","bfl":"101.4","zfl":"2224","type":"影视","time":"10+"}
{"rank":75,"title":"数学揭示万物密码 当然这还不够我们要去探访一下永恒。","dzl":"8.4","bfl":"132.2","zfl":"1487","type":"知识","time":"5-8"}
{"rank":76,"title":"如果2077的CV来配音《赛博朋克:边缘行者》(第八话)","dzl":"7.5","bfl":"57.5","zfl":"4243","type":"动画","time":"10+"}
{"rank":77,"title":"自己做的游戏终于发布了!赚了多少钱!?","dzl":"17.6","bfl":"155.7","zfl":"3834","type":"游戏","time":"0-3"}
{"rank":78,"title":"《要有自己的颜色》","dzl":"21.2","bfl":"199.9","zfl":"1782","type":"生活","time":"0-3"}
{"rank":79,"title":"买一块几十斤巨大安格斯上脑又被坑惨了涮麻辣火锅却爽翻了","dzl":"11.2","bfl":"136.6","zfl":"803","type":"美食","time":"8-10"}
{"rank":80,"title":"鹦鹉螺:我不是活化石","dzl":"14.3","bfl":"199.4","zfl":"1950","type":"知识","time":"10+"}
{"rank":27,"title":"【洛天依游学记原创曲】歌行四方 | AI歌手X非遗音乐","dzl":"10.4","bfl":"200.7","zfl":"5512","type":"音乐","time":"3-5"}
{"rank":46,"title":"我居然穿越回了10年前的B站!!!","dzl":"7.7","bfl":"116.6","zfl":"3811","type":"生活","time":"8-10"}
{"rank":47,"title":"陈Sir的工作日","dzl":"13.5","bfl":"110.8","zfl":"26000","type":"生活","time":"0-3"}
{"rank":48,"title":"干嘛啊","dzl":"24.0","bfl":"266.1","zfl":"7128","type":"生活","time":"0-3"}
{"rank":49,"title":"你看你又不敢对峙命运了吧!!!","dzl":"14.0","bfl":"97.8","zfl":"696","type":"游戏","time":"3-5"}
{"rank":50,"title":"我花1万块重庆打车到上海却被全国网友说成老赖","dzl":"15.3","bfl":"140.3","zfl":"10000","type":"生活","time":"10+"}
{"rank":51,"title":"摸 气 挑 战 (2)","dzl":"26.3","bfl":"247.7","zfl":"9562","type":"生活","time":"0-3"}
{"rank":52,"title":"仙人揉腹操九式详解版!做完大拉特拉","dzl":"7.9","bfl":"69.7","zfl":"14000","type":"运动","time":"0-3"}
{"rank":53,"title":"看着徒弟一点点长大逝去才发现长生是苦【我和徒弟03】","dzl":"5.6","bfl":"150.4","zfl":"618","type":"动画","time":"10+"}
{"rank":54,"title":"祝妹妹一路顺风","dzl":"25.4","bfl":"170.8","zfl":"18000","type":"生活","time":"0-3"}
{"rank":55,"title":"300w粉丝特别节目!拍了一些大家想看的但是也太怪了?","dzl":"12.3","bfl":"66.5","zfl":"710","type":"知识","time":"5-8"}
{"rank":56,"title":"爆肝几个星期我建出了最细节的海岛小镇!!!","dzl":"24.3","bfl":"214.8","zfl":"5545","type":"游戏","time":"5-8"}
{"rank":57,"title":"让巴黎看到国人的美","dzl":"24.4","bfl":"186.1","zfl":"784","type":"生活","time":"0-3"}
{"rank":58,"title":"村里来了新成员漠叔好心劝上学做社会有用的人","dzl":"11.9","bfl":"119.5","zfl":"1510","type":"美食","time":"10+"}
{"rank":59,"title":"《原神》角色演示-「菲米尼:海露幽响」","dzl":"11.7","bfl":"110.7","zfl":"7387","type":"游戏","time":"5-8"}
{"rank":61,"title":"当你碰到经验就会「瞬间暴毙」!!?","dzl":"10.0","bfl":"105.6","zfl":"554","type":"游戏","time":"10+"}
{"rank":62,"title":"大学开学时的各种人|大学学习","dzl":"9.0","bfl":"294.9","zfl":"224","type":"搞笑","time":"3-5"}
{"rank":63,"title":"都什么年代谁还用传统方式结义?!!","dzl":"9.0","bfl":"60.6","zfl":"3278","type":"搞笑","time":"10+"}
{"rank":64,"title":"【闽南婚宴】中式流水席天花板吃过一辈子忘不掉。","dzl":"39.1","bfl":"393.2","zfl":"6.5","type":"美食","time":"10+"}
{"rank":65,"title":"口腔溃疡为什么是白色的?","dzl":"18.1","bfl":"318.9","zfl":"3562","type":"知识","time":"3-5"}
{"rank":66,"title":"我今年拼过最牛的积木!(上)","dzl":"7.8","bfl":"172.8","zfl":"8298","type":"动画","time":"10+"}
{"rank":60,"title":"【太君の噩梦】打服日本一战成名比你想象的更夸张!苏联军神朱可夫","dzl":"10.7","bfl":"130.1","zfl":"1218","type":"知识","time":"10+"}
上传原始数据
hdfs dfs -mkdir -p /video/data
hdfs dfs -put data.json /video/data
hdfs dfs -cat /video/data/data.json
视频类别占比
json数据格式
[{"name": "影视","value": 9},{"name": "搞笑","value": 14},{"name": "美食","value": 7},{"name": "生活","value": 25},{"name": "知识","value": 16},{"name": "动画","value": 10},{"name": "游戏","value": 13},{"name": "运动","value": 3},{"name": "音乐","value": 3}
]
视频时长占比
json数据格式
[{"name": "3-5","value": 19},{"name": "5-8","value": 16},{"name": "10+","value": 34},{"name": "0-3","value": 23},{"name": "8-10","value": 8}
]
视频类别播放量和点赞量
json数据格式
{"dzlList": ["195.0","362.6","154.7","458.7","252.5","103.4","201.9","25.9","28.8"],"bflList": ["3009.1","3506.5","1691.9","5348.0","3108.9","1605.0","2234.7","447.0","370.2"],"catList": ["影视","搞笑","美食","生活","知识","动画","游戏","运动","音乐"]
}
视频类别点赞率
json数据格式
{"catList": ["影视","搞笑","美食","生活","知识","动画","游戏","运动","音乐"],"dzpList": ["6.48","10.34","9.14","8.58","8.12","6.44","9.03","5.79","7.78"]
}
在HDFS上准备业务数据
vim data.json
hdfs dfs -mkdir -p /video/data
hdfs dfs -put data.json /video/data/data.json
hdfs dfs -cat /video/data/data.json
数据清洗
原始数据 video.json 格式
{"rank":1,"title":"《逃出大英博物馆》第二集","dzl":"77.8","bfl":"523.9","zfl":"39000","type":"影视","time":"3-5"}
清洗后数据 video.csv 格式
1,《逃出大英博物馆》第二集,77.8,523.9,39000,影视,3-5
javabean
com.lihaozhe.video.Video
package com.lihaozhe.video;import lombok.*;/*** 数据清洗的序列化类** @author 李昊哲* @version 1.0* @create 2023-11-8*/
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class Video {/*** 视频编号*/private int rank;/*** 视频标题*/private String title;/*** 视频点赞量*/private String dzl;/*** 视频播放量*/private String bfl;/*** 视频转发量*/private String zfl;/*** 视频类型*/private String type;/*** 视频时长*/private String time;@Overridepublic String toString() {return rank + "\t" + title + "\t" + dzl + "\t" + bfl + "\t" + zfl + "\t" + type + "\t" + time;}
}
ETL数据格式化
com.lihaozhe.video.DataFormat
package com.lihaozhe.video;import com.lihaozhe.util.json.jackson.JacksonUtils;
import com.lihaozhe.util.string.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;/*** 原始 json 格式数据清洗为 csv 文件** @author 李昊哲* @version 1.0* @create 2023-11-9*/
public class DataFormat {/*** 原始数据清洗 mapper*/public static class DataFormatMapper extends Mapper<LongWritable, Text, Text, NullWritable> {// 将转换后的结果封装为输出的 keyText outKey = new Text();// 输出的 valueNullWritable outValue = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {// 读取文档一行内容String string = value.toString();// 将读取的内容转为 Video 对象Video video = JacksonUtils.json2bean(string, Video.class);try {// 将 对象 转为 csv 文件 格式字符串String csv = StringUtils.format2CSV(video);// 将转换后的结果封装为输出的 keyoutKey.set(csv);// 输出清洗的数据context.write(outKey, outValue);} catch (IllegalAccessException e) {throw new RuntimeException(e);}}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 设置环境变量 hadoop 用户名 为 rootSystem.setProperty("HADOOP_USER_NAME", "root");// 参数配置对象Configuration conf = new Configuration();// 跨平台提交conf.set("mapreduce.app-submission.cross-platform", "true");// 本地运行// conf.set("mapreduce.framework.name", "local");// 设置默认文件系统为 本地文件系统// conf.set("fs.defaultFS", "file:///");// 声明Job对象 就是一个应用Job job = Job.getInstance(conf, "video data format");// 指定当前Job的驱动类// 本地提交 注释该行job.setJarByClass(DataFormat.class);// 本地提交启用该行// job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");// 指定当前Job的 Mapperjob.setMapperClass(DataFormatMapper.class);// 指定当前Job的 Combiner 注意:一定不能影响最终计算结果 否则 不使用// job.setCombinerClass(WordCountReduce.class);// 指定当前Job的 Reducer// job.setReducerClass(WordCountReduce.class);// 设置 reduce 数量为 零job.setNumReduceTasks(0);// 设置 map 输出 key 的数据类型job.setMapOutputValueClass(Text.class);// 设置 map 输出 value 的数据类型job.setMapOutputValueClass(NullWritable.class);// 设置最终输出 key 的数据类型// job.setOutputKeyClass(Text.class);// 设置最终输出 value 的数据类型// job.setOutputValueClass(NullWritable.class);// 定义 map 输入的路径 注意:该路径默认为hdfs路径FileInputFormat.addInputPath(job, new Path("/video/data/data.json"));// 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径Path dst = new Path("/video/ods");// 保护性代码 如果 reduce 输出目录已经存在则删除 输出目录DistributedFileSystem dfs = new DistributedFileSystem();String nameService = conf.get("dfs.nameservices");String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;dfs.initialize(URI.create(hdfsRPCUrl), conf);if (dfs.exists(dst)) {dfs.delete(dst, true);}// FileSystem fs = FileSystem.get(conf);
// if (fs.exists(dst)) {
// fs.delete(dst, true);
// }FileOutputFormat.setOutputPath(job, dst);// 提交 job// job.submit();System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
项目打包上传测试
项目打包
mvn package
上传 jar 文件到集群
scp hadoop.jar root@spark01:
在集群上提交 job
hadoop jar hadoop.jar com.lihaozhe.video.DataFormat
查看 job 执行结果
hdfs dfs -cat /video/ods/part-m-00000
视频占比代码
mapreduce统计
com.lihaozhe.video.CategoryPercentJob
package com.lihaozhe.video;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;/*** @author 李昊哲* @version 1.0* @create 2023-11-9*/
public class CategoryPercentJob {public static class CategoryPercentMapper extends Mapper<LongWritable, Text, Text, IntWritable> {// 输出的 keyText outKey = new Text();// 输出的 valueIntWritable outValue = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {// 读取文档一行内容 使用逗号分隔将 内容转为字符串数组String[] split = value.toString().split(",");// 获取字符串数组下标为5的元素封装为输出的keyoutKey.set(split[5]);// 输出数据context.write(outKey, outValue);}}public static class CategoryPercentReducer extends Reducer<Text, IntWritable, Text, IntWritable> {// 输出的 valueIntWritable outValue = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}outValue.set(sum);context.write(key, outValue);}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 设置环境变量 hadoop 用户名 为 rootSystem.setProperty("HADOOP_USER_NAME", "root");// 参数配置对象Configuration conf = new Configuration();// 跨平台提交conf.set("mapreduce.app-submission.cross-platform", "true");// 本地运行// conf.set("mapreduce.framework.name", "local");// 设置默认文件系统为 本地文件系统// conf.set("fs.defaultFS", "file:///");// 声明Job对象 就是一个应用Job job = Job.getInstance(conf, "category percent");// 指定当前Job的驱动类// 本地提交 注释该行job.setJarByClass(CategoryPercentJob.class);// 本地提交启用该行// job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");// 指定当前Job的 Mapperjob.setMapperClass(CategoryPercentMapper.class);// 指定当前Job的 Combiner 注意:一定不能影响最终计算结果 否则 不使用job.setCombinerClass(CategoryPercentReducer.class);// 指定当前Job的 Reducerjob.setReducerClass(CategoryPercentReducer.class);// 设置 reduce 数量为 零// job.setNumReduceTasks(0);// 设置 map 输出 key 的数据类型job.setMapOutputValueClass(Text.class);// 设置 map 输出 value 的数据类型job.setMapOutputValueClass(IntWritable.class);// 设置最终输出 key 的数据类型job.setOutputKeyClass(Text.class);// 设置最终输出 value 的数据类型job.setOutputValueClass(IntWritable.class);// 定义 map 输入的路径 注意:该路径默认为hdfs路径FileInputFormat.addInputPath(job, new Path("/video/ods/part-m-00000"));// 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径Path dst = new Path("/video/dwd");// 保护性代码 如果 reduce 输出目录已经存在则删除 输出目录DistributedFileSystem dfs = new DistributedFileSystem();String nameService = conf.get("dfs.nameservices");String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;dfs.initialize(URI.create(hdfsRPCUrl), conf);if (dfs.exists(dst)) {dfs.delete(dst, true);}// FileSystem fs = FileSystem.get(conf);
// if (fs.exists(dst)) {
// fs.delete(dst, true);
// }FileOutputFormat.setOutputPath(job, dst);// 提交 job// job.submit();System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
项目打包上传测试
项目打包
mvn package
上传 jar 文件到集群
scp hadoop.jar root@spark01:
在集群上提交 job
hadoop jar hadoop.jar com.lihaozhe.video.CategoryPercentJob
查看 job 执行结果
hdfs dfs -cat /video/dwd/part-r-00000
数据写入数据库
创建数据表
create database htu;
use htu;
create table category_percent(name varchar(50) comment '视频类别名称',value int unsigned comment '视频数量'
) comment '视频类别占比信息表';
javabean
package com.lihaozhe.video;import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class CategoryPercent implements DBWritable {/*** 视频类型*/private String name;/*** 视频数量*/private int value;@Overridepublic String toString() {return name + "\t" + value;}@Overridepublic void write(PreparedStatement pst) throws SQLException {pst.setString(1, this.name);pst.setInt(2, this.value);}@Overridepublic void readFields(ResultSet rs) throws SQLException {this.name = rs.getString(1);this.value = rs.getInt(2);}
}
job类
com.lihaozhe.video.CategoryPercentWriteDB
package com.lihaozhe.video;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;/*** @author 李昊哲* @version 1.0* @create 2023-11-9*/
public class CategoryPercentWriteDB {public static class CategoryPercentMapper extends Mapper<LongWritable, Text, CategoryPercent, NullWritable> {/*** 输出的 key*/CategoryPercent outKey = new CategoryPercent();/*** 输出的 value*/NullWritable outValue = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CategoryPercent, NullWritable>.Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");outKey.setName(split[0]);outKey.setValue(Integer.parseInt(split[1]));context.write(outKey, outValue);}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 设置环境变量 hadoop 用户名 为 rootSystem.setProperty("HADOOP_USER_NAME", "root");// 参数配置对象Configuration conf = new Configuration();// 配置JDBC 参数DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://spark03:3306/htu?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai","root", "Lihaozhe!!@@1122");// 跨平台提交conf.set("mapreduce.app-submission.cross-platform", "true");// 本地运行// conf.set("mapreduce.framework.name", "local");// 设置默认文件系统为 本地文件系统// conf.set("fs.defaultFS", "file:///");// 声明Job对象 就是一个应用Job job = Job.getInstance(conf, "category percent write db");// 指定当前Job的驱动类// 本地提交 注释该行job.setJarByClass(CategoryPercentWriteDB.class);// 本地提交启用该行// job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");// 指定当前Job的 Mapperjob.setMapperClass(CategoryPercentMapper.class);// 指定当前Job的 Combiner 注意:一定不能影响最终计算结果 否则 不使用// job.setCombinerClass(WordCountReduce.class);// 指定当前Job的 Reducer// job.setReducerClass(WordCountReduce.class);// 设置 reduce 数量为 零job.setNumReduceTasks(0);// 设置 map 输出 key 的数据类型job.setMapOutputValueClass(CategoryPercent.class);// 设置 map 输出 value 的数据类型job.setMapOutputValueClass(NullWritable.class);// 设置最终输出 key 的数据类型// job.setOutputKeyClass(Text.class);// 设置最终输出 value 的数据类型// job.setOutputValueClass(NullWritable.class);// 定义 map 输入的路径 注意:该路径默认为hdfs路径FileInputFormat.addInputPath(job, new Path("/video/dwd/part-r-00000"));// 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径
// Path dst = new Path("/video/ods");
// // 保护性代码 如果 reduce 输出目录已经存在则删除 输出目录
// DistributedFileSystem dfs = new DistributedFileSystem();
// String nameService = conf.get("dfs.nameservices");
// String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;
// dfs.initialize(URI.create(hdfsRPCUrl), conf);
// if (dfs.exists(dst)) {
// dfs.delete(dst, true);
// }// FileSystem fs = FileSystem.get(conf);
// if (fs.exists(dst)) {
// fs.delete(dst, true);
// }
// FileOutputFormat.setOutputPath(job, dst);// 设置输出类job.setOutputFormatClass(DBOutputFormat.class);// 配置将数据写入表DBOutputFormat.setOutput(job, "category_percent", "name", "value");// 提交 job// job.submit();System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
项目打包上传测试
项目打包
mvn package
上传 jar 文件到集群
scp hadoop.jar root@spark01:
集群提交 job
hadoop jar hadoop.jar com.lihaozhe.video.CategoryPercentWriteDB
遇到以下报错,原因是 hadoop集群上没有 MySQL驱动
Error: java.io.IOException: com.mysql.cj.jdbc.Driverat org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.getRecordWriter(DBOutputFormat.java:197)at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.<init>(MapTask.java:660)at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:780)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:348)at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
上传 MySQL 驱动到 $HADOOP_HOME/share/hadoop/common/lib
mysql-connector-j-8.0.33.jar
protobuf-java-3.5.1.jar
将 MySQL 驱动同步到集群其它节点
scp $HADOOP_HOME/share/hadoop/common/lib/mysql-connector-j-8.0.33.jar root@spark02:$HADOOP_HOME/share/hadoop/common/lib/
scp $HADOOP_HOME/share/hadoop/common/lib/mysql-connector-j-8.0.33.jar root@spark03:$HADOOP_HOME/share/hadoop/common/lib/
推荐重启集群后重新提交job
hadoop jar hadoop.jar com.lihaozhe.video.CategoryPercentJob
数据传输
创建数据表
create database htu_view;
use htu_view;
create table category_percent(name varchar(50) comment '视频类别名称',value int unsigned comment '视频数量'
) comment '视频类别占比信息表';
jdbc
package com.lihaozhe.video;import com.mysql.cj.jdbc.Driver;import java.sql.*;
import java.util.ArrayList;
import java.util.List;/*** @author 李昊哲* @version 1.0* @create 2023-11-9*/
public class CategoryPercentCopyDB {private final static String user = "root";private final static String passowrd = "Lihaozhe!!@@1122";static {// DriverManager.registerDriver(new Driver());try {Class.forName("com.mysql.cj.jdbc.Driver");} catch (ClassNotFoundException e) {throw new RuntimeException(e);}}public static void main(String[] args) throws SQLException {List<CategoryPercent> list = select();insert(list);System.out.println("数据同步完成");}public static List<CategoryPercent> select() throws SQLException {// 保存结果集String url = "jdbc:mysql://spark03:3306/htu?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai";List<CategoryPercent> list = new ArrayList<>();Connection conn = DriverManager.getConnection(url, user, passowrd);String sql = "select name, value from category_percent";PreparedStatement pst = conn.prepareStatement(sql);ResultSet rs = pst.executeQuery();CategoryPercent categoryPercent = new CategoryPercent();while (rs.next()) {categoryPercent.setName(rs.getString("name"));categoryPercent.setValue(rs.getInt("value"));list.add(categoryPercent);}rs.close();pst.close();conn.close();// 返回结果集return list;}public static void insert(List<CategoryPercent> list) throws SQLException {// 保存结果集String url = "jdbc:mysql://spark03:3306/htu_view?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai";Connection conn = DriverManager.getConnection(url, user, passowrd);// 开启事务conn.setAutoCommit(false);String sql = "insert into category_percent (name,value) values (?,?)";PreparedStatement pst = conn.prepareStatement(sql);int i = 0;for (CategoryPercent categoryPercent : list) {pst.setString(1, categoryPercent.getName());pst.setInt(2, categoryPercent.getValue());pst.executeUpdate();i++;}if (i > 0) {conn.commit();}pst.close();conn.close();}
}
数据可视化
获取json格式字符串
package com.lihaozhe.video;import com.lihaozhe.util.json.jackson.JacksonUtils;import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;/*** @author 李昊哲* @version 1.0* @create 2023-11-9*/
public class CategoryPercentReadDB2Json {public static void main(String[] args) throws SQLException, ClassNotFoundException {Class.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://spark03:3306/htu_view?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai";String user = "root";String passowrd = "Lihaozhe!!@@1122";List<CategoryPercent> list = new ArrayList<>();Connection conn = DriverManager.getConnection(url, user, passowrd);String sql = "select name, value from category_percent";PreparedStatement pst = conn.prepareStatement(sql);ResultSet rs = pst.executeQuery();CategoryPercent categoryPercent = new CategoryPercent();while (rs.next()) {categoryPercent.setName(rs.getString("name"));categoryPercent.setValue(rs.getInt("value"));list.add(categoryPercent);}rs.close();pst.close();conn.close();String json = JacksonUtils.bean2json(list);System.out.println(json);assert json != null;try {Files.write(Paths.get("category_percent.json"), json.getBytes());} catch (IOException e) {throw new RuntimeException(e);}System.out.println("读取完成");}}