二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON)

一、目的

由于部分数据类型频率为1s,从而数据规模特别大,因此完整的JSON放在Hive中解析起来,尤其是在单机环境下,效率特别慢,无法满足业务需求。

而Flume的拦截器并不能很好的转换数据,因为只能采用Java方式,从Kafka的主题A中采集数据,并解析字段,然后写入到放在Kafka主题B中

二 、原始数据格式

JSON格式比较正常,对象中包含数组

{
    "deviceNo": "39",
    "sourceDeviceType": null,
    "sn": null,
    "model": null,
    "createTime": "2024-09-03 14:10:00",
    "data": {
        "cycle": 300,
        "evaluationList": [{
            "laneNo": 1,
            "laneType": null,
            "volume": 3,
            "queueLenMax": 11.43,
            "sampleNum": 0,
            "stopAvg": 0.54,
            "delayAvg": 0.0,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 2,
            "laneType": null,
            "volume": 7,
            "queueLenMax": 23.18,
            "sampleNum": 0,
            "stopAvg": 0.47,
            "delayAvg": 10.57,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 3,
            "laneType": null,
            "volume": 9,
            "queueLenMax": 11.54,
            "sampleNum": 0,
            "stopAvg": 0.18,
            "delayAvg": 9.67,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 4,
            "laneType": null,
            "volume": 6,
            "queueLenMax": 11.36,
            "sampleNum": 0,
            "stopAvg": 0.27,
            "delayAvg": 6.83,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        }]
    }
}

三、Java代码

package com.kgc;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaKafkaEvaluation {// 添加 Kafka Producer 配置private static Properties producerProps() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.ACKS_CONFIG, "-1");props.put(ProducerConfig.RETRIES_CONFIG, "3");props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");props.put(ProducerConfig.LINGER_MS_CONFIG, "1");props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");return props;}public static void main(String[] args) {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 每一个消费,都要定义不同的Group_IDprop.put(ConsumerConfig.GROUP_ID_CONFIG, "evaluation_group");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Collections.singleton("topic_internal_data_evaluation"));ObjectMapper mapper = new ObjectMapper();// 初始化 Kafka ProducerKafkaProducer<String, String> producer = new KafkaProducer<>(producerProps());while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {try {JsonNode rootNode = mapper.readTree(record.value());System.out.println("原始数据"+rootNode);String device_no = rootNode.get("deviceNo").asText();String source_device_type = rootNode.get("sourceDeviceType").asText();String sn = rootNode.get("sn").asText();String model = rootNode.get("model").asText();String create_time = rootNode.get("createTime").asText();String cycle = rootNode.get("data").get("cycle").asText();JsonNode evaluationList = rootNode.get("data").get("evaluationList");for (JsonNode evaluationItem : evaluationList) {String lane_no = evaluationItem.get("laneNo").asText();String lane_type = evaluationItem.get("laneType").asText();String volume = evaluationItem.get("volume").asText();String queue_len_max = evaluationItem.get("queueLenMax").asText();String sample_num = evaluationItem.get("sampleNum").asText();String stop_avg = evaluationItem.get("stopAvg").asText();String delay_avg = evaluationItem.get("delayAvg").asText();String pass_rate = evaluationItem.get("passRate").asText();String travel_dist = evaluationItem.get("travelDist").asText();String travel_time_avg = evaluationItem.get("travelTimeAvg").asText();String outputLine = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s",device_no, source_device_type, sn, model, create_time, cycle,lane_no, lane_type,volume,queue_len_max,sample_num,stop_avg,delay_avg,pass_rate,travel_dist,travel_time_avg);// 发送数据到 KafkaProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic_db_data_evaluation", record.key(), outputLine);producer.send(producerRecord, (RecordMetadata metadata, Exception e) -> {if (e != null) {e.printStackTrace();} else {System.out.println("The offset of the record we just sent is: " + metadata.offset());}});}} catch (Exception e) {e.printStackTrace();}}consumer.commitAsync();}}}

1、服务器IP都是   192.168.0.70

2、消费Kafka主题(数据源):topic_internal_data_evaluation

3、生产Kafka主题(目标源):topic_db_data_evaluation

4、注意:字段顺序与ODS层表结构字段顺序一致!!!

四、开启Kafka主题topic_db_data_evaluation消费者

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.0.70:9092  --topic topic_db_data_evaluation  --from-beginning

五、运行测试

1、启动项目

2、消费者输出数据

然后再用Flume采集写入HDFS就行了,不过ODS层表结构需要转变

六、ODS层新表结构

create external table  if not exists  hurys_dc_ods.ods_evaluation(device_no           string        COMMENT '设备编号',source_device_type  string        COMMENT '设备类型',sn                  string        COMMENT '设备序列号 ',model               string        COMMENT '设备型号',create_time         timestamp     COMMENT '创建时间',cycle               int           COMMENT '评价数据周期',lane_no             int           COMMENT '车道编号',lane_type           int           COMMENT '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',volume              int           COMMENT '车道内过停止线流量(辆)',queue_len_max       float         COMMENT '车道内最大排队长度(m)',sample_num          int           COMMENT '评价数据计算样本量',stop_avg            float         COMMENT '车道内平均停车次数(次)',delay_avg           float         COMMENT '车道内平均延误时间(s)',pass_rate           float         COMMENT '车道内一次通过率',travel_dist         float         COMMENT '车道内检测行程距离(m)',travel_time_avg     float         COMMENT '车道内平均行程时间'
)
comment '评价数据外部表——静态分区'
partitioned by (day string)
row format delimited fields terminated by ','
stored as SequenceFile
;

七、Flume采集配置文件

八、运行Flume任务,检查HDFS文件、以及ODS表数据

--刷新表分区
msck repair table ods_evaluation;
--查看表分区
show partitions hurys_dc_ods.ods_evaluation;
--查看表数据
select * from hurys_dc_ods.ods_evaluation
where day='2024-09-03';

搞定,这样就不需要在Hive中解析JSON数据了!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/418823.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙自动化发布测试版本app

创建API客户端 API客户端是AppGallery Connect用于管理用户访问AppGallery Connect API的身份凭据&#xff0c;您可以给不同角色创建不同的API客户端&#xff0c;使不同角色可以访问对应权限的AppGallery Connect API。在访问某个API前&#xff0c;必须创建有权访问该API的API…

UE5.3_跟一个插件—Socket.IO Client

网上看到这个插件,挺好! 项目目前也没有忙到不可开交,索性跟着测一下吧: 商城可见,售价72.61人民币! 但是,git上有仓库哦,免费!! 跟着链接先准备起来: Documentation: GitHub - getnamo/SocketIOClient-Unreal: Socket.IO client plugin for the Unreal Engin…

数据仓库理论知识

1、数据仓库的概念 数据仓库&#xff08;英文&#xff1a;Date Warehouse&#xff0c;简称数仓、DW&#xff09;&#xff0c;是一个用于数据存储、分析、报告的数据系统。数据仓库的建设目的是面向分析的集成化数据环境&#xff0c;其数据来源于不同的外部系统&#…

【H2O2|全栈】Markdown | Md 笔记到底如何使用?【前端 · HTML前置知识】

Markdown的一些杂谈 目录 Markdown的一些杂谈 前言 准备工作 认识.Md文件 为什么使用Md&#xff1f; 怎么使用Md&#xff1f; ​编辑 怎么看别人给我的Md文件&#xff1f; Md文件命令 切换模式 粗体、倾斜、下划线、删除线和荧光标记 分级标题 水平线 引用 无序…

缓存类型以及读写策略

缓存&#xff08;Cache&#xff09;是一种高效的数据存储技术&#xff0c;旨在提高数据访问速度。 它将频繁访问或最近使用的数据临时存储在更快速但较小的存储介质&#xff08;如内存&#xff09;中&#xff0c;以减少从较慢的存储设备&#xff08;如硬盘或远程服务器&#x…

4G模块、WIFI模块、NBIOT模块通过AT指令连接华为云物联网服务器(MQTT协议)

MQTT协议概述 MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;是一种轻量级的消息传输协议&#xff0c;它被设计用来提供一对多的消息分发和应用之间的通讯&#xff0c;尤其适用于远程位置的设备和高延迟或低带宽的网络。MQTT协议基于客户端-服务器架构&…

iOS——方法交换Method Swizzing

什么是方法交换 Method Swizzing是发生在运行时的&#xff0c;主要用于在运行时将两个Method进行交换&#xff0c;我们可以将Method Swizzling代码写到任何地方&#xff0c;但是只有在这段Method Swilzzling代码执行完毕之后互换才起作用。 利用Objective-C Runtimee的动态绑定…

网络编程学习:TCP/IP协议

TCP/IP协议简介 TCP/IP协议包含了一系列的协议&#xff0c;也叫TCP/IP协议族&#xff08;TCP/IP Protocol Suite&#xff0c;或TCP/IP Protocols&#xff09;&#xff0c;简称TCP/IP。 分层结构 为了能够实现不同类型的计算机和不同类型的操作系统之间进行通信&#xff0c;引…

Zookeeper基本原理

1.什么是Zookeeper? Zookeeper是一个开源的分布式协调服务器框架&#xff0c;由Apache软件基金会开发&#xff0c;专为分布式系统设计。它主要用于在分布式环境中管理和协调多个节点之间的配置信息、状态数据和元数据。 Zookeeper采用了观察者模式的设计理念&#xff0c;其核心…

在vscode中用virtual env的方法

vscode是非常常用的软件开发工具。我们也非常了解如何使用vscode开发python的基本方法。当然&#xff0c;vscode可以开发基本所有编程语言。真的是又大又全又好用。 那么为什么要在vscode里面使用virtual env呢&#xff1f;因为python开发会遇到包管理的问题。而virtual env可…

Flutter 小技巧之 Row/Column 即将支持 Flex.spacing

事实上这是一个相当久远的话题&#xff0c;如果对于前因后果不管兴趣&#xff0c;直接看最后就行。 这个需求最早提及应该是 2018 年初在 #16957 被人提起&#xff0c;因为在 Flutter 上 Wrap 有 runSpacing 和 spacing 用于配置垂直和水平间距&#xff0c;而为什么 Colum 和 …

C++ | Leetcode C++题解之第392题判断子序列

题目&#xff1a; 题解&#xff1a; class Solution { public:bool isSubsequence(string s, string t) {int n s.size(), m t.size();vector<vector<int> > f(m 1, vector<int>(26, 0));for (int i 0; i < 26; i) {f[m][i] m;}for (int i m - 1; …

C语言第一周课

目录 1.程序设计的基本概念是什么?C语言是什么?算法是什么? 2.开发环境 3.第一个C语言程序 4.搭建VC6编译环境 5.实际操作,完成第一个C语言程序 6.体验在线编译环境 1.程序设计的基本概念是什么?C语言是什么?算法是什么? C语言是一种较早的程序设计语言&#xff0c…

Linux第十一节课 - 进程

一个程序从磁盘以文件的形式加载到内存之后&#xff0c;已经变成了进程&#xff01; 引入管理者和被管理者 1、管理者和被管理者不需要见面&#xff01;&#xff08;例如学生和校长&#xff01;&#xff09; 2、管理者在不见被管理者的情况下&#xff0c;如何做好管理呢&…

Windows下Python和PyCharm的应用(二)__快捷键方式的设定

前言 程序写久了&#xff0c;难免会形成自己的编程习惯。比如对某一套快捷键的使用&#xff0c;已经形成了肌肉记忆。 为了方便快捷键的使用&#xff0c;可以在PyCharm中设置自己喜欢的快捷键。 我比较习惯于微软Visual Studio的快捷键设置。&#xff08;因为早些年VC开发用的…

计算机网络与Internet应用

一、计算机网络 1.计算机网络的定义 网络定义&#xff1a;计算机网络是指将地理位置不同的具有独立功能的多台计算机及其外部设备&#xff0c;通过通信线路连接起来&#xff0c;在网络操作系统&#xff0c;网络管理软件及网络通信协议的管理和协调下&#xff0c;实现资源共享…

数据仓库技术选型方案文档

关联博客&#xff1a; 数据仓库技术选型方案文档 Flink CDC MySQL数据同步到Doris表同步配置生成工具类 新版报表系统&#xff08;明细报表、看板、数据大屏&#xff09;方案&介绍 文章目录 数据仓库技术选型背景现状现状架构目标架构业务反馈&痛点问题&#xff1a;原因…

QT定时器QObiect/QTimer

QT定时器 一、QObiect: startTimer ----------- killTimer 电子相册&#xff0c;利用定时器轮播图片 #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent) :QWidget(parent),ui(new Ui::Widget) {ui->setupUi(this);picID …

【C++11】类的新功能

【C11】类的新功能 文章目录 【C11】类的新功能一、移动构造和移动赋值的特点二、类成员变量初始化三、强制生成默认函数的关键字default四、禁止生成默认函数的关键字delete五、继承和多态中的fifinal与override关键字 一、移动构造和移动赋值的特点 ​ 默认成员函数&#xf…

对一个已经运行的LabVIEW VI进行控制

要对一个已经运行的LabVIEW VI进行控制&#xff0c;可以采用多种方法&#xff0c;这取决于你想要控制的内容以及具体的应用场景。以下是几种常见的实现方式&#xff1a; 1. 使用全局变量或功能全局变量&#xff08;FGV&#xff09; 方法: 你可以创建全局变量或功能全局变量&am…