大数据-227 离线数仓 - Flume 自定义拦截器(续接上节) 采集启动日志和事件日志

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • Flume 自定义拦截器
  • 拦截原理 拦截器实现 Java

在这里插入图片描述

自定义拦截器

(续接上节,上节已经到了打包的部分)

上传结果

将刚才的打包上传到这个目录下:
我拷贝的是带依赖的:“flume-test-1.0-SNAPSHOT-jar-with-dependencies.jar”

/opt/servers/apache-flume-1.9.0-bin/lib/

测试效果

我们创建刚才说的conf文件:

vim /opt/wzk/flume-conf/flumetest1.conf

编写的内容如下图所示:
在这里插入图片描述
启动进行测试:

flume-ng agent --conf-file /opt/wzk/flume-conf/flumetest1.conf -name a1 -Dflume.roog.logger=INFO,console

启动结果如下图所示:
z

我们启动 telnet 来传入数据:

telnet h122.wzk.icu 9999

启动之后输入数据:

2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

此时控制台的数据内容部分为:

4/08/27 15:31:31 INFO source.NetcatSource: Source starting
24/08/27 15:31:31 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.16.1.130:9999]
24/08/27 15:32:09 INFO sink.LoggerSink: Event: { headers:{logtime=2020-07-30} body: 32 30 32 30 2D 30 37 2D 33 30 20 31 34 3A 31 38 2020-07-30 14:18 }

对应的截图如下图所示:
在这里插入图片描述

采集启动日志(使用自定义拦截器)

配置文件

新建一个配置文件:

vim /opt flume-log2hdfs2.conf

写入的内容如下所示:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.CustomerInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/start/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# 使用本地时间
# a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

内容的截图如下所示:
在这里插入图片描述
修改的内容如下:

  • 给source增加自定义拦截器
  • 去掉时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true
  • 根据header中的logtime写文件

测试运行

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs2.conf -name a1 -Dflume.roog.logger=INFO,console

拷贝日志

修改日志的内容:

vim /opt/wzk/logs/start/test.log

继续写入如下的内容:

2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

写入内容如下图所示:
在这里插入图片描述

测试效果

可以看到HDFS上,已经有了读出日志中的时间的内容:
在这里插入图片描述

采集启动日志和事件日志

本系统中要采集两种日志:

  • 启动日志
  • 事件日志
    不同的日志放置在不同的目录下,要想一次拿到全部日志需要监控多个目录。
    在这里插入图片描述

总体思路

  • taildir 监控多个目录
  • 修改自定义拦截器,不同来源的数据加上不同标志
  • HDFS、Sink 根据标志写文件

Agent 介绍

Flume 是一个分布式、高可靠、可用来收集、聚合和传输大量日志数据的系统。在 Flume 的体系结构中,Agent 是一个关键的组件。每个 Agent 是一个独立的 JVM 进程,负责从数据源获取数据并将其传递到下游(如文件系统、数据库、或者另一个 Agent)

Agent 的核心组成部分

Flume Agent 的架构是高度模块化的,它由以下三个核心组件构成:

Source (源)

Source 是数据流的起点,负责接收外部数据。它支持多种数据传输协议和格式,能够从日志文件、网络端口、消息队列等数据源中接收事件。

支持的 Source 类型:

  • Avro Source:接收来自其他 Flume Agent 的 Avro 格式数据。
  • Syslog Source:处理 Syslog 消息。
  • Exec Source:从本地命令或脚本的输出中读取数据。
  • HTTP Source:通过 HTTP 接口接收数据。
  • Spooling Directory Source:监控特定目录中的文件并读取内容。

Channel (通道)

Channel 是 Agent 的数据缓冲区域,用于在 Source 和 Sink 之间暂存事件。Channel 的设计保证了在数据流动中断(如网络故障)时,数据不会丢失。

常见的 Channel 类型:

  • Memory Channel:将事件存储在内存中,速度快,但可能会丢失数据(如果 Agent 崩溃)。
  • File Channel:将事件存储在磁盘文件中,提供高可靠性但性能较低。
  • Kafka Channel:使用 Kafka 作为中转通道,适合分布式场景。

Sink (接收器)

Sink 是数据流的终点,负责将事件传递到下游存储或处理系统。

支持的 Sink 类型:

  • HDFS Sink:将事件写入 Hadoop HDFS。
  • Kafka Sink:将事件发送到 Kafka。
  • Elasticsearch Sink:将事件写入 Elasticsearch。
  • Logger Sink:将事件输出到日志。
  • Avro Sink:将事件传递到另一个 Flume Agent 的 Avro Source。

Agent配置

vim /opt/wzk/flume-conf/flume-log2hdfs3.conf

写入的内容如下图所示:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/wzk/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event
# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.LogTypeInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%
{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • filegroups:指定filegroups,可以有多个,以空格分割(taildir source可以同时监控多个目录中的文件)
  • headers.filegroups.headerKey

给Event增加header Key,不同的filegroup,可配置不同的value。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/476008.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CANoe录制和回放CAN报文

目录 1、录制报文 2、离线回放 3、在线回放 3.1、在线回放设置 CANoe是一款用于汽车电子测试的工具,它可以模拟CAN网络中的各种设备,并支持CAN报文的录制和回放功能,方便我们远程调试。 1、录制报文 在Measurement Setupk面板点击Loggi…

大数据调度组件之Apache DolphinScheduler

Apache DolphinScheduler 是一个分布式易扩展的可视化 DAG 工作流任务调度系统。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。 主要特性 易于部署,提供四种部署方式,包括Standalone、Cluster、Docker和…

XCode Build时遇到 .entitlements could not be opened 的问题

遇到错误 在构建成功的XCode工程上,手动打开XCode并Build,遇到以下问题: The file .entitlements could not be opened. Did you forget to declare this file as an output of a script phase or custom build rule which produces it 打…

关于一次开源java spring快速开发平台项目RuoYi部署的记录

关于一次开源java spring快速开发平台项目RuoYi部署的记录 本次因为需要一些练习环境,想要快速搭建一个javaweb 项目作为练习环境,经过查询和实验找到一个文档详细,搭建简单,架构也相对比较新的开源项目RuoYi。 项目介绍&#xf…

原生微信小程序在顶部胶囊左侧水平设置自定义导航兼容各种手机模型

无论是在什么手机机型下,自定义的导航都和右侧的胶囊水平一条线上。如图下 以上图iphone12,13PRo 以上图是没有带黑色扇帘的机型 以下是调试器看的wxml的代码展示 注意:红色阔里的是自定义导航(或者其他的logo啊,返回之…

列出D3的所有交互方法,并给出示例

D3.js 提供了丰富的交互方法,可以用来增强图表的用户交互体验。以下是一些常用的交互方法及其示例: 1. 鼠标事件 on("mouseover", function) 用途: 当鼠标悬停在元素上时触发。示例:svg.selectAll(".bar").on("mouseover&qu…

小程序-使用 iconfont 图标库报错:Failed to load font

官方默认可以忽略此错误,在清除缓存后首次刷新会显示此错误,重新渲染错误消失 解决方法: 在 iconfont 图标库选择项目设置 选中 Base64 保存,重新点击链接 -> 复制代码到项目中 操作步骤:

[免费]SpringBoot+Vue毕业设计论文管理系统【论文+源码+SQL脚本】

大家好,我是java1234_小锋老师,看到一个不错的SpringBootVue毕业设计论文管理系统,分享下哈。 项目视频演示 【免费】SpringBootVue毕业设计论文管理系统 Java毕业设计_哔哩哔哩_bilibili 项目介绍 现代经济快节奏发展以及不断完善升级的信…

System Control Units (SCU)

本文对Ifx TC3xx的System Control Units (SCU)模块进行介绍,此网页为汇总连接,具体模块见对应超链接。 系统控制单元(SCU)是一组控制各种系统功能的子模块,包括以下模块: Reset Control (RCU)Trap genera…

网站推广实战案例:杭州翔胜科技有限公司如何为中小企业打开市场大门

以下是以杭州翔胜科技有限公司为例,解析其如何通过网站推广为中小企业打开市场大门的实战案例: 一、一站式网站推广方案 杭州翔胜科技有限公司提供一站式网站推广方案,该方案整合了多种推广手段,如搜索引擎优化(SEO&a…

Spring Cloud Stream实现数据流处理

1.什么是Spring Cloud Stream? 我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套API,无需关心具体的消息队列实现”。 这样理解是有些不全面的,Spring Cloud Stream的核心是Stream&#xf…

OpenMMlab导出Mask R-CNN模型并用onnxruntime和tensorrt推理

onnxruntime推理 使用mmdeploy导出onnx模型: from mmdeploy.apis import torch2onnx from mmdeploy.backend.sdk.export_info import export2SDKimg demo.JPEG work_dir ./work_dir/onnx/mask_rcnn save_file ./end2end.onnx deploy_cfg mmdeploy/configs/mmd…

【大语言模型】ACL2024论文-19 SportsMetrics: 融合文本和数值数据以理解大型语言模型中的信息融合

【大语言模型】ACL2024论文-19 SportsMetrics: 融合文本和数值数据以理解大型语言模型中的信息融合 https://arxiv.org/pdf/2402.10979 目录 文章目录 【大语言模型】ACL2024论文-19 SportsMetrics: 融合文本和数值数据以理解大型语言模型中的信息融合目录摘要研究背景问题与挑…

39页PDF | 毕马威_数据资产运营白皮书(限免下载)

一、前言 《毕马威数据资产运营白皮书》探讨了数据作为新型生产要素在企业数智化转型中的重要性,提出了数据资产运营的“三要素”(组织与意识、流程与规范、平台与工具)和“四重奏”(数据资产盘点、评估、治理、共享)…

【UE5】使用基元数据对材质传参,从而避免新建材质实例

在项目中,经常会遇到这样的需求:多个模型(例如 100 个)使用相同的材质,但每个模型需要不同的参数设置,比如不同的颜色或随机种子等。 在这种情况下,创建 100 个实例材质不是最佳选择。正确的做…

[STBC]

空时分组编码STBC(Space Time Block Coding): //一个数据流通过多个天线发射发送,硬件编码器 STBC概念是从MIMO技术衍生出来的,目的是在多天线系统中提高数据传输的可靠性和传输距离。在rx(接收天线)和tx&…

241120学习日志——[CSDIY] [InternStudio] 大模型训练营 [09]

CSDIY:这是一个非科班学生的努力之路,从今天开始这个系列会长期更新,(最好做到日更),我会慢慢把自己目前对CS的努力逐一上传,帮助那些和我一样有着梦想的玩家取得胜利!!&…

PCB 间接雷击模拟

雷击是一种危险的静电放电事件,其中两个带电区域会瞬间释放高达 1 千兆焦耳的能量。雷击就像一个短暂而巨大的电流脉冲,会对建筑物和电子设备造成严重损坏。雷击可分为直接和间接两类,其中间接影响是由于感应能量耦合到靠近雷击位置的物体。间…

IDEA2019搭建Springboot项目基于java1.8 解决Spring Initializr无法创建jdk1.8项目 注释乱码

后端界面搭建 将 https://start.spring.io/ 替换https://start.aliyun.com/ 报错 打开设置 修改如下在这里插入代码片 按此方法无果 翻阅治疗后得知 IDEA2019无法按照网上教程修改此问题因此更新最新idea2024或利用插件Alibaba Clouod Toolkit 换用IDEA2024创建项目 下一步…

单向C to DP视频传输解决方案 | LDR6500

LDR6500D如何通过Type-C接口实现手机到DP接口的单向视频传输 在当今数字化浪潮中,投屏技术作为连接设备、共享视觉内容的桥梁,其重要性日益凸显。PD(Power Delivery)芯片,特别是集成了Type-C接口与DisplayPort&#xf…