大数据-玩转数据-Flume

一、Flume简介

    1. Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集、聚集、移动的服务,Flume只能在Unix环境下运行。
    1. Flume基于流式架构,容错性强,也很灵活简单。
    1. Flume、Kafka用来实时进行数据收集,Spark、Flink用来实时处理数据,impala用来实时查询。

二、Flume角色

2.1、Source

用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel,这个有点类似于Java IO部分的Channel。

2.2、Channel

用于桥接Sources和Sinks,类似于一个队列。

2.3、Sink

从Channel收集数据,将数据写到目标源(可以是下一个Source,也可以是HDFS或者HBase)。

2.4、Event

传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。

三、Flume传输过程

source监控某个文件或数据流,数据源产生新的数据,拿到该数据后,将数据封装在一个Event中,并put到channel后commit提交,channel队列先进先出,sink去channel队列中拉取数据,然后写入到HDFS中。

四、Flume部署及使用

4.1 采集架构

在这里插入图片描述

4.2 Flume安装

4.2.1 下载

apache-flume-1.6.0-bin.tar.gz
链接:https://pan.baidu.com/s/1ySmEEObFtKtyT7GsEldnfA
提取码:436t

4.2.2 安装

Flume的安装非常简单,只需要解压即可
tar -zxvf apache-flume-1.6.0-bin.tar.gz
然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME

在这里,我们使用集群模式,因此,需要把在master节点部署的flume分发到slave节点上:
]# scp -rp apache-flume-1.7.0-bin slave1:KaTeX parse error: Expected 'EOF', got '#' at position 6: PWD ]#̲ scp -rp apache…PWD

4.2.3 测试

采集配置:

vi netcat-logger.conf
# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 描述和配置sink组件:k1
a1.sinks.k1.type = logger
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent去采集数据
启动命令:

bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
-c conf   指定flume自身的配置文件所在目录
-f conf/netcat-logger.con  指定我们所描述的采集方案
-n a1  指定我们这个agent的名字

在这里插入图片描述
先要往agent采集监听的端口上发送数据,让agent有数据可采
发送命令:

安装telnet:

]# yum install telnet
]# telnet anget-hostname port (telnet localhost 44444)

测试输入输出如下图:
在这里插入图片描述
在这里插入图片描述

4.3 Flume配置

1)Flume 配置分析
在这里插入图片描述
Flume 直接读 log 日志的数据,log 日志的格式是 app-yyyy-mm-dd.log。
2)Flume 的具体配置如下:
(1)在/opt/module/flume/conf 目录下创建 file-flume-kafka.conf 文件

vim file-flume-kafka.conf
a1.sources=r1
a1.channels=c1 c2
#configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/src/apache-flume-1.7.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/log/2020-11-03/app.*.log
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.zgjy.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.zgjy.flume.interceptor.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_resource = c1
a1.sources.r1.selector.mapping.topic_action = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c1.kafka.topic = topic_resource
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
# configure channe2
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c2.kafka.topic = topic_action
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

测试日志:
在这里插入图片描述
配置说明如下:
在这里插入图片描述

4.4 Flume 的 ETL 和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL 拦截器、日志类型区分拦截器。
ETL 拦截器主要作用:过滤时间戳不合法和 Json 数据不完整的日志
日志类型区分拦截器主要作用:将启动日志和事件日志区分开来,方便发往 Kafka 的不 同 Topic。

1)创建 Maven 工程 flume-interceptor
2)创建包名:com.zgjy.flume.interceptor
3)在 pom.xml 文件中添加如下配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.zgjy</groupId><artifactId>flume-interceptor</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.41</version></dependency><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.7.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.5.3</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin></plugins></build></project>

4)在 com.zgjy.flume.interceptor 包下创建 LogETLInterceptor 类名
Flume ETL 拦截器 LogETLInterceptor实现代码如下:

package 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/190699.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机视觉:使用opencv实现银行卡号识别

1 概述 1.1 opencv介绍 OpenCV是Open Source Computer Vision Library&#xff08;开源计算机视觉库&#xff09;的简称&#xff0c;由Intel公司在1999年提出建立&#xff0c;现在由Willow Garage提供运行支持&#xff0c;它是一个高度开源发行的计算机视觉库&#xff0c;可以…

ESP32 C3 smartconfig一键配网报错

AP配网 在调试我的esp32c3的智能配网过程中&#xff0c;发现ap配网使用云智能App是可以正常配置的。 切记用户如果在menu菜单里使能AP配网&#xff0c;默认SSID名字为adh_PK值_MAC后6位。用户可以修改这个apssid的键值&#xff0c;但是要使用云智能app则这个名字的开头必须为ad…

电源基础元件

文章目录 电源基础元件理想电压源理想电流源受控电源 电源基础元件 理想电压源 定义 其两端电压总能保持定值或一定的时间函数&#xff0c;其值与流过它的电流i无关的元件叫理想电压源 理想电压源的电压、电流关系 1.电源两端电压由电源本身决定&#xff0c;与外电路无关&…

windows安装nginx

一、下载安装Nginx 1、官网下载地址&#xff1a;nginx: download 2、下载教程&#xff1a;选择最新的Stable version&#xff08;稳定版本&#xff09;下载到本地 3、下载完成后&#xff0c;解压放入本地非中文的文件夹中&#xff1a; 4、启动nginx&#xff1a;切勿直接双击n…

2390 高校实验室预约系统JSP【程序源码+文档+调试运行】

摘要 本文介绍了一个高校实验室预约系统的设计和实现。该系统包括管理员、教师和学生三种用户&#xff0c;具有基础数据管理、学生管理、教师管理、系统公告管理、实验室管理、实验室预约管理和系统管理等模块。通过数据库设计和界面设计&#xff0c;实现了用户友好的操作体验…

taro(踩坑) npm run dev:weapp 微信小程序开发者工具预览报错

控制台报错信息&#xff1a; VM72:9 app.js错误: Error: module vendors-node_modules_taro_weapp_prebundle_chunk-JUEIR267_js.js is not defined, require args is ./vendors-node_modules_taro_weapp_prebundle_chunk-JUEIR267_js.js 环境&#xff1a; node 版本&#x…

Python数据容器(序列操作)

序列 1.什么是序列 序列是指&#xff1a;内容连续、有序。可以使用下标索引的一类数据容器 列表、元组、字符串。均可以视为序列 2.序列的常用操作 - 切片 语法&#xff1a;序列[起始下标:结束下标:步长]起始下标表示从何处开始&#xff0c;可以留空&#xff0c;留空视作从…

华为ensp:为vlan配置ip

配置对应vlan的ip vlan1 interface Vlanif 1 进入vlan1 ip address 192.168.1.254 24配置IP为192.168.1.254 子网掩码为24位 这样就配置上ip了 vlan2 interface Vlanif 2 ip address 192.168.2.254 24 vlan3 interface Vlanif 3 ip address 192.168.3.254 24 查看结果 …

JDK更换版本不生效问题

JDK版本更换 问题: 当本地电脑拥有多个版本jdk时, 切换jdk版本不生效 解决方案: 1.查看环境变量(高版本的jdk安装时自动注入环境变量) 2.将Path里面的jdk的bin配置上移到最上面 3.查看jdk版本, java -version 启动项目,显示如下使用了jdk17

【Python小程序】浮点矩阵加减法

一、内容简介 本文使用Python编写程序&#xff0c;实现2个m * n矩阵的加、减法。具体过程如下&#xff1a; 给定两个m*n矩阵A和B&#xff0c;返回A与B的和或差。 二、求解方法 将两个矩阵对应位置上的元素相加。 三、Python代码 import numpy as np# 用户输入两个矩阵的维…

贝锐向日葵如何实现无人值守远程控制?

1.适用场景 &#xff08;1&#xff09;远程公司电脑应急办公&#xff08;2&#xff09;远程家里电脑游戏挂机&#xff08;3&#xff09;异地远程传输文件 2.操作步骤 &#xff08;1&#xff09;电脑安装向日葵个人版并登录贝锐账号&#xff08;点击注册&#xff09;&#xf…

木板上的蚂蚁(c++题解)

题目描述 有一块木板&#xff0c;长度为 n 个 单位 。一些蚂蚁在木板上移动&#xff0c;每只蚂蚁都以 每秒一个单位 的速度移动。其中&#xff0c;一部分蚂蚁向 左 移动&#xff0c;其他蚂蚁向 右 移动。 当两只向 不同 方向移动的蚂蚁在某个点相遇时&#xff0c;它们会同时改…

找工作的网站都有哪些

吉鹿力招聘网作为一家知名的招聘网站&#xff0c;因其功能完善和用户隐私保护而备受用户青睐。它不仅可以与企业直接沟通&#xff0c;还可以提供在线聊工作的机会。通过吉鹿力招聘网&#xff0c;用户可以自主选择工作地点、时间和工作类型&#xff0c;大大提高了找到合适工作的…

jupyter lab常用插件集合

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

不同优化器的应用

简单用用&#xff0c;优化器具体参考 深度学习中的优化器原理(SGD,SGDMomentum,Adagrad,RMSProp,Adam)_哔哩哔哩_bilibili 收藏版&#xff5c;史上最全机器学习优化器Optimizer汇总 - 知乎 (zhihu.com) import numpy as np import matplotlib.pyplot as plt import torch # …

面向切面:AOP

面向切面&#xff1a;AOP 大家好&#xff0c;今天本篇博客我们来了解Spring里边的另一个重要部分&#xff0c;叫做AOP&#xff0c;也就是我们说的面向切面编程。 1、场景模拟 首先第一部分&#xff0c;咱们做一个场景模拟。我们先写一个简单的例子&#xff0c;然后通过例子引…

22.能被7整除,并且求和。

#include<stdio.h>int main(){int i ,sum0;printf("1-1000能被7整除的数字有&#xff1a;\n");for(i1;i<1000;i){if(i%70){printf("%d ",i);sumsumi;} }printf("\n");printf("能被7整除的数字的和是&#xff1a;%d ",sum);re…

Django之路由层

文章目录 路由匹配语法路由配置注意事项转换器注册自定义转化器 无名分组和有名分组无名分组有名分组 反向解析简介普通反向解析无名分组、有名分组之反向解析 路由分发简介为什么要用路由分发&#xff1f;路由分发实现 伪静态的概念名称空间虚拟环境什么是虚拟环境&#xff1f…

P5906 【模板】回滚莫队不删除莫队

这一题&#xff0c;虽说在洛谷标的是模板题&#xff0c;但可能没有“历史研究”那一题更加模板。 这一题相对于回滚莫队的模板题&#xff0c;可能在回滚的处理上稍微复杂了一点。对于回滚莫队就不多解释了&#xff0c;可以看一下 回滚莫队模板题 这一篇博客&#xff0c;稍微简单…

DMDEM部署说明-详细步骤-(DM8达梦数据库)

DMDEM部署说明-详细步骤-DM8达梦数据库 环境介绍1 部署DM8 数据库1.1 创建一个数据库作为DEM后台数据库1.2 创建数据库用户 DEM1.3 使用DEM用户导入dem_init.sql 2 配置tomcat2.1 配置/tomcat/conf/server.xml2.2 修改jvm启动参数 3 配置JAVA 1.8及以上版本的运行时环境3.1 配置…