大数据-226 离线数仓 - Flume 优化配置 自定义拦截器 拦截原理 拦截器实现 Java

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 需求分析 指标口径
  • 日志数据采集 taildir source HDFS Sink Agent Flume
  • 优化配置

在这里插入图片描述

Flume的优化配置

Flume 是一种分布式、可靠且高效的数据收集、聚合和传输系统,广泛应用于大数据生态系统中。为了提升 Flume 的性能和稳定性,优化配置至关重要。

使用如下的指令,启动Agent进行测试:

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs1.conf -name a1 -Dflum
e.roog.logger=INFO,console

启动后的截图如下所示:
在这里插入图片描述

查看刚才的Flume窗口:
在这里插入图片描述

查看HDFS的内容:
在这里插入图片描述

批量处理

  • 参数:batchSize
  • 作用:控制 Flume 在批量传输时每次传输的事件数量。
    配置建议:
  • Source 到 Channel:根据 Source 的吞吐量和 Channel 的吞吐能力调整,推荐值为 100-1000。
  • Channel 到 Sink:根据 Sink 的处理能力和目标系统的写入性能调整,推荐值为 500-5000。

压缩传输

  • 参数:compressionType
  • 作用:对事件进行压缩后传输,减少网络带宽消耗。
  • 支持的压缩类型:gzip、snappy、lz4 等。
  • 配置建议:根据目标系统是否支持解压缩功能选择合适的压缩类型。

Source 优化

Taildir Source

  • 参数:batchSize 和 fileHeader
  • batchSize:设置单次从文件中读取的事件数量。
  • fileHeader:是否在事件头部添加文件名,推荐开启以便于后续处理。

Kafka Source

  • 参数:kafka.consumer.timeout.ms 和 fetch.message.max.bytes
  • kafka.consumer.timeout.ms:设置 Kafka 消费者读取数据的超时时间,通常为 100-500ms。
  • fetch.message.max.bytes:设置每次读取的最大消息大小,默认值通常为 1MB,可以根据业务场景适当调整。

Channel 优化

Memory Channel

  • 参数:capacity 和 transactionCapacity
  • capacity:Channel 中允许的最大事件数。
  • transactionCapacity:单次事务中允许的最大事件数。

File Channel

  • 参数:checkpointDir 和 dataDirs
  • checkpointDir:存储 Channel 状态的目录。
  • dataDirs:存储事件数据的目录,建议设置多个磁盘路径以提升 IO 性能。
  • 配置建议:确保磁盘 IO 性能足够,避免瓶颈。

Flume报错解决

向 logs 目录中存放入日志文件,此时如果出现OOM的日志,是因为缺省情况下FlumeJVM的最大分配20M,这个值太小,需要调整。
我这里直接放入:

vim /opt/wzk/logs/start/test.log2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

解决方案:
在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容:

export JAVA_OPTS="-Xms4000m -Xmx4000m -
Dcom.sun.management.jmxremote"
# 要想使配置文件生效,还要在命令行中指定配置文件目录
flume-ng agent --conf flume-1.9/conf --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,consoleflume-ng agent --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console

Flume内存参数设置及优化:

  • 根据日志数据量大小,JVM堆一般要设置为4G或者更高
  • -Xms -Xmx最好设置一致,减少内存抖动带来的性能影响

自定义拦截器

前面FlumeAgent的配置使用了本地时间,可能导致数据存放的路径不正确。要解决上面的问题就需要使用自定义拦截器。
Agent用于测试自定义拦截器,source => logger sink
flumetest1.conf

# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = h122.wzk.icu
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.CustomerInterceptor$Builder
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = logger
# source、channel、sink之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器原理

自定义拦截器的原理:

  • 自定义拦截器要集成 Flume 的 Interceptor
  • Event 分为 header 和 body (接收的字符串)
  • 获取 header 和 body
  • 从 body 中 获取 time,并将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置到header中

自定义拦截器实现

自定义拦截器的实现:

  • 获取event的header
  • 获取event的body
  • 解析body获取json串
  • 解析json串获取时间戳
  • 将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置header中
  • 返回event

导入依赖

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency>
</dependencies>

编写代码

package icu.wzk;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class CustomerInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 这里是逐条处理String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);// 获取Event的HeaderMap<String, String> headerMap = event.getHeaders();// 解析Body获取JSON字符串String[] bodyArr = eventBody.split("\\s+");try {String jsonStr = bodyArr[6];// 解析JSON字符串获取时间戳JSONObject jsonObject = JSON.parseObject(jsonStr);String timestampStr = jsonObject.getJSONObject("app_active").getString("time");// 将时间戳转换字符串 yyyy-MM-dd// 将字符串转换为Longlong timestampLong = Long.parseLong(timestampStr);DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");Instant instant = Instant.ofEpochMilli(timestampLong);LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());String date = formatter.format(localDateTime);// 将转换后的字符串放置header中headerMap.put("logtime", date);event.setHeaders(headerMap);} catch (Exception e) {headerMap.put("logtime", "Unknown");event.setHeaders(headerMap);}return event;}@Overridepublic List<Event> intercept(List<Event> list) {List<Event> lstEvent = new ArrayList<>();for (Event event : list) {Event outEvent = intercept(event);if (outEvent != null) {lstEvent.add(outEvent);}}return lstEvent;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new CustomerInterceptor();}@Overridepublic void configure(Context context) {}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/473381.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI行业动态:AGI预测、模型进化与工具革新

本周&#xff0c;人工智能&#xff08;AI&#xff09;领域的新闻层出不穷&#xff0c;从关于通用人工智能&#xff08;AGI&#xff09;何时到来的预测&#xff0c;到模型训练与推理技术的突破&#xff0c;再到各种实用工具的更新迭代&#xff0c;精彩纷呈。让我们一起深入了解这…

vue3 如何调用第三方npm包内部的 pinia 状态管理库方法

抛砖引玉: 如果在开发vue3项目是, 引用了npm第三方包 ,而且这个包内使用了Pinia 状态管理库,那我们如何去调用 npm内部的 Pinia 状态管理库呢? 实际遇到的问题: 今天在制作npm包时遇到的问题,之前Vue2版本的时候状态管理库用的Vuex ,当时调用npm包内的状态管理库很简单,直接引…

AWTK-WIDGET-WEB-VIEW 实现笔记 (4) - Ubuntu

Ubuntu 上实现 AWTK-WIDGET-WEB-VIEW 开始以为很简单&#xff0c;后来发现是最麻烦的。因为 Ubuntu 上的 webview 库是 基于 GTK 的&#xff0c;而 AWTK 是基于 X11 的&#xff0c;两者的窗口系统不同&#xff0c;所以期间踩了几个大坑。 1. 编译 AWTK 在使用 Linux 的输入法时…

C++之内存管理

​ &#x1f339;个人主页&#x1f339;&#xff1a;喜欢草莓熊的bear &#x1f339;专栏&#x1f339;&#xff1a;C入门 目录 前言 一、C/C内存分配 二、 malloc、calloc、realloc、free 三、C内存管理方式 3.1 new/delete 操作内置类型 3.2 new和detele操作自定义类型…

Visual Studio 2017 快捷键设置-批量注释和批量取消注释

一.批量注释设置&#xff1a; 1&#xff09;打开Visual Studio 2017,点击菜单栏中的“工具”&#xff0c;然后选中“选项”&#xff1a; 2&#xff09;选中“键盘”&#xff0c;在“显示命令包含”输入框中输入“注释”&#xff1a; 3&#xff09;选中“编辑&#xff1a;注释选…

从零入门激光SLAM(二十三)——direct_visual_lidar_calibration全型号激光雷达-相机标定包

大家好呀&#xff0c;我是一个SLAM方向的在读博士&#xff0c;深知SLAM学习过程一路走来的坎坷&#xff0c;也十分感谢各位大佬的优质文章和源码。随着知识的越来越多&#xff0c;越来越细&#xff0c;我准备整理一个自己的激光SLAM学习笔记专栏&#xff0c;从0带大家快速上手激…

蓝桥杯备赛(持续更新)

16届蓝桥杯算法类知识图谱.pdf 1. 格式打印 %03d&#xff1a;如果是两位数&#xff0c;将会在前面添上一位0 %.2f&#xff1a;会保留两位小数 如果是long&#xff0c;必须在数字后面加上L。 2. 进制转化 2.1. 十进制转任意进制&#xff1a; 十进制转任意进制时&#xff…

【STL】set,multiset,map,multimap的介绍以及使用

关联式容器 在C的STL中包含序列式容器和关联式容器 1.关联式容器&#xff1a;它里面存储的是元素本身&#xff0c;其底层是线性序列的数据结构&#xff0c;比如&#xff1a;vector&#xff0c;list&#xff0c;deque&#xff0c;forward_list(C11)等 2.关联式容器里面储存的…

螺旋矩阵II(leetcode 59)

转圈过程&#xff08;边界处理&#xff09;遵循循环不变量的原则&#xff0c;坚持一个原则处理每一条边&#xff0c;左闭右开处理 class Solution { public:vector<vector<int>> generateMatrix(int n) {vector<vector<int>> num(n, vector<int>…

MCU的时钟体系

stm32F4的时钟体系图 1MHZ 10^6 HZ 系统时钟频率是168MHZ;AHB1、AHB2、AHB3总线上的时钟频率是168MHz;APB1总线上的时钟频率为42MHz&#xff1b;APB2总线上的时钟频率为84MHz&#xff1b; stm32F4的时钟体系图 在system_stm32f4xx.c文件中查看APB1和APB2的预分频值到底是多少…

走进嵌入式开发世界

目录 一、概述 二、嵌入式开发的核心要素 2.1. 硬件平台选择与设计 2.1.1. 处理器选择 2.1.2. 电路设计 2.1.3.硬件集成与测试 2.2. 软件开发与调试 2.2.1. 编程语言选择 2.2.2. 操作系统与中间件 2.2.3. 软件架构与模块化设计 2.2.4. 调试与测试 三、系统优化与功…

SpringCloud篇(服务网关 - GateWay)

目录 一、简介 二、为什么需要网关 二、gateway快速入门 1. 创建gateway服务&#xff0c;引入依赖 2. 编写启动类 3. 编写基础配置和路由规则 4. 重启测试 5. 网关路由的流程图 6. 总结 三、断言工厂 四、过滤器工厂 1. 路由过滤器的种类 2. 请求头过滤器 3. 默认…

技术理论||02空中三角测量

空中三角测量指的是根据少量控制点坐标,利用空间前后交汇,对六个外方位要素进行解算,从而获得大量未知点坐标及图像外方位要素。空三测量精度是整个摄影测量过程中的关键环节,空三解算的精度直接影响到数字正射图像、实景三维模型等数字化成果的质量。在空三加密的平差解算中,主…

OpenTelemetry 赋能DevOps流程的可观测性革命

作者&#xff1a;天颇 引言 在当今快节奏的软件开发和运维环境中&#xff0c;DevOps 已经成为主流&#xff0c;它通过整合开发和运维流程&#xff0c;推动着软件的快速迭代和持续交付。然而&#xff0c;随着微服务、容器化和云计算等技术的普及&#xff0c;系统复杂性急剧增加…

大数据如何助力干部选拔的公正性

随着社会的发展和进步&#xff0c;干部选拔成为组织管理中至关重要的一环。传统的选拔方式可能存在主观性、不公平性以及效率低下等问题。大数据技术的应用&#xff0c;为干部选拔提供了更加全面、精准、客观的信息支持&#xff0c;显著提升选拔工作的科学性和公正性。以下是大…

风电电力系统低碳调度论文阅读第一期

在碳交易市场中&#xff0c;历史法和基准线法是用于分配碳排放配额的两种主要方法。以下是两种方法的公式及其解释&#xff1a; 区别总结 历史法&#xff1a;基于历史排放量&#xff0c;分配具有较强的公平性但可能缺乏激励减排。基准线法&#xff1a;基于行业基准和生产量&am…

PHP代码审计 --MVC模型开发框架rce示例

MVC模型开发框架 控制器Controller&#xff1a;负责响应用户请求、准备数据&#xff0c;及决定如何展示数据 模块Model&#xff1a;管理业务逻辑和数据库逻辑&#xff0c;提供链接和操作数据库的抽象层 视图View&#xff1a;负责前端模板渲染数据&#xff0c;通过html呈现给用户…

RT-Thread 星火1号学习笔记

LED 闪烁例程 硬件说明 LED 连接在开发板的某个 GPIO 端口上&#xff0c;通过控制该端口的高低电平来实现 LED 的亮灭。 软件说明 初始化 GPIO 端口 /* 配置 LED 灯引脚 */ #define PIN_LED_B GET_PIN(F, 11) // PF11 : LED_B --> LED #defi…

c++调用 c# dll 通过 clr (详细避坑)

项目场景&#xff1a; .NET Framework 4.7.2 需要在纯C项目中调用C# 的DLL C# DLL 在.NET core 或者 .NET 8 中无法使用AOT正常导出DLL 解决方案&#xff1a; 通过 用 C/clr 项目中转 来调用 1.在c# .NET Framework项目中把接口写好&#xff0c; 这里不推荐使用 .NET 8&#…

【动手学深度学习Pytorch】1. 线性回归代码

零实现 导入所需要的包&#xff1a; # %matplotlib inline import random import torch from d2l import torch as d2l import matplotlib.pyplot as plt import matplotlib import os构造人造数据集&#xff1a;假设w[2, -3.4]&#xff0c;b4.2&#xff0c;存在随机噪音&…