【数据采集与预处理】流数据采集工具Flume

目录

一、Flume简介

(一)Flume定义

(二)Flume作用

二、Flume组成架构

三、Flume安装配置

(一)下载Flume

(二)解压安装包

(三)配置环境变量

(四)查看Flume版本信息

四、Flume的运行

(一)Telnet准备工作

(二)使用Avro数据源测试Flume

(三)使用netcat数据源测试Flume

五、Flume作为Spark Streaming数据源

(一)Spark准备工作

(二)使用Flume作为Spark Streaming数据源


一、Flume简介

数据流 :数据流通常被视为一个随时间延续而无限增长的动态数据集合,是一组顺序、大量、快速、连续到达的数据序列。通过对流数据处理,可以进行卫星云图监测、股市走向分析、网络攻击判断、传感器实时信号分析。

(一)Flume定义

        Apache Flume是一种分布式、具有高可靠和高可用性的数据采集系统,可从多个不同类型、不同来源的数据流汇集到集中式数据存储系统中。Flume 基于流式架构,灵活简单。

(二)Flume作用

        Flume最主要的作用就是,实时读取服务器本地磁盘的数据,可将日志采集后传输到HDFS、Hive、HBase、Kafka等大数据组件。

二、Flume组成架构

1、Agent
        Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的,是 Flume 数据传输的基本单元。Agent 主要有 3 个部分组成,Source、Channel、Sink。

2、Source
        Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

3、Channel
        Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink的读取操作。
Flume 自带两种 Channel:Memory Channel 和 File Channel。
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

4、 Sink
        Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
        Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
        Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。

5、Event
        传输单元,Flume 数据传输的基本单元,以事件的形式将数据从源头送至目的地。

Flume Agent 内部原理:

三、Flume安装配置

(一)下载Flume

到Flume官网下载Flume1.7.0安装文件,下载地址如下:

http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

下载完成后上传到虚拟机的“/usr/local/uploads”目录下。

(二)解压安装包

首先进入到“uploads”目录下。将压缩包解压到“/usr/local”目录下

[root@bigdata zhc]# cd /usr/local/uploads
[root@bigdata uploads]# tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local

将解压的文件修改名字为flume,简化操作。把/usr/local/flume目录的权限赋予当前登录Linux系统的用户。

[root@bigdata uploads]# cd /usr/local
[root@bigdata local]# mv apache-flume-1.7.0-bin flume
[root@bigdata local]# chown -R zhc:zhc ./flume

 

(三)配置环境变量

首先,修改/etc/profile配置文件:

[root@bigdata local]# vi /etc/profile

export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin
export FLUME_CONF_DIR=$FLUME_HOME/conf

使文件生效:

[root@bigdata local]# source /etc/profile

下面修改 flume-env.sh 配置文件:

[root@bigdata local]# cd /usr/local/flume/conf
[root@bigdata conf]# cp flume-env.sh.template flume-env.sh
[root@bigdata conf]# vi flume-env.sh

在文件中增加一行内容,用于设置JAVA_HOME变量:

export JAVA_HOME=/usr/local/servers/jdk

然后,保存flume-env.sh文件,并退出vim编辑器。

(四)查看Flume版本信息

[root@bigdata conf]# cd /usr/local/flume
[root@bigdata flume]# ./bin/flume-ng version

然后就会发现如下报错: “错误: 找不到或无法加载主类”

原因分析:
(1)jdk 冲突
(2)安装了HBase就会报着个错

解决方法:

到“/usr/local/flume/bin”目录下修改flume-ng文件。

[root@bigdata flume]# cd /usr/local/flume/bin
[root@bigdata bin]# vi flume-ng

在文件中加入以下内容:

2>/dev/null | grep hbase

再次查看flume版本信息。

四、Flume的运行

(一)Telnet准备工作

后面的步骤中要用到telnet,在这里先安装:

[root@bigdata zhc]# yum install telnet

(二)使用Avro数据源测试Flume

        Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。请对Flume的相关配置文件进行设置,从而可以实现如下功能:在一个终端中新建一个文件helloworld.txt(里面包含一行文本“Hello World”),在另外一个终端中启动Flume以后,可以把helloworld.txt中的文本内容显示出来。

1、创建agent配置文件

[root@bigdata zhc]# cd /usr/local/flume/conf
[root@bigdata conf]# vi avro.conf

在文件中加入以下内容: 

#/usr/local/flume/conf/avro.confa1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = avroa1.sources.r1.channels = c1a1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 4141#注意这个端口名,在后面的教程中会用得到# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

上面Avro Source参数说明如下:
        Avro Source的别名是avro,也可以使用完整类别名称org.apache.flume.source.AvroSource,因此,上面有一行设置是a1.sources.r1.type = avro,表示数据源的类型是avro。
        bind绑定的ip地址或主机名,使用0.0.0.0表示绑定机器所有的接口。a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。
        port表示绑定的端口。a1.sources.r1.port = 4141,表示绑定的端口是4141。
        a1.sinks.k1.type = logger,表示sinks的类型是logger。

2、启动flume agent a1

[root@bigdata conf]# /usr/local/flume/bin/flume-ng agent -c . -f /usr/local/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console 

这个终端不要关闭。

3、创建指定文件

新建一个终端,输入以下命令:

[root@bigdata mycode]# cd /home/zhc
[root@bigdata zhc]# cd /home/zhc/mycode
[root@bigdata mycode]# mkdir flume
[root@bigdata mycode]# cd flume
[root@bigdata flume]# echo "Hello World">> ./helloworld.txt
[root@bigdata flume]# /usr/local/flume/bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /home/zhc/mycode/flume/helloworld.txt

执行之后,我们就可以在前面不让关闭的那个终端看到Hello World了:

(三)使用netcat数据源测试Flume

        请对Flume的相关配置文件进行设置,从而可以实现如下功能:在一个Linux终端(这里称为“Flume终端”)中,启动Flume,在另一个终端(这里称为“Telnet终端”)中,输入命令“telnet localhost 44444”,然后,在Telnet终端中输入任何字符,让这些字符可以顺利地在Flume终端中显示出来。

1、创建netcat的agent配置

[root@bigdata conf]# cd /usr/local/flume/conf
[root@bigdata conf]# vi netcat.conf

在文件中加入以下内容:  

#/usr/local/flume/conf/netcat.conf# Name the components on this agent  a1.sources = r1  a1.sinks = k1  a1.channels = c1  # Describe/configure the source  a1.sources.r1.type = netcat  a1.sources.r1.bind = localhost  a1.sources.r1.port = 44444 #同上,记住该端口名# Describe the sink  a1.sinks.k1.type = logger  # Use a channel which buffers events in memory  a1.channels.c1.type = memory  a1.channels.c1.capacity = 1000  a1.channels.c1.transactionCapacity = 100  # Bind the source and sink to the channel  a1.sources.r1.channels = c1  a1.sinks.k1.channel = c1  

2、启动flume agent

[root@bigdata conf]# /usr/local/flume/bin/flume-ng agent --conf /usr/local/flume/conf --conf-file /usr/local/flume/conf/netcat.conf --name a1 -Dflume.root.logger=INFO,console

这个终端不要关闭。

3、新建一个终端输入

[root@bigdata flume]# telnet localhost 44444

在这个终端输入字符串就可以显示在前面那个终端里了,但是中文是不支持的,显示长度也有限。

五、Flume作为Spark Streaming数据源

(一)Spark准备工作

1、下载spark-streaming-flume_2.11-2.3.4.jar

首先,到官网下载spark-streaming-flume_2.11-2.3.4.jar:

https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume

上面的网址要是打不开,可以用下面的这个网址:

https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-flume_2.11

2、把这个jar文件放到“/usr/local/spark/jars/flume”目录下

[root@bigdata flume]# cd /usr/local/spark/jars
[root@bigdata jars]# mkdir flume
[root@bigdata jars]# cd flume
[root@bigdata flume]# cp /usr/local/uploads/spark-streaming-flume_2.11-2.3.4.jar .

注意:此处不要将“/usr/local/flume/lib”目录下的所有jar包都拷贝到“/usr/local/spark/jars/flume” 目录下,不然会使Spark和Hadoop版本与Guava库的版本不兼容,从而导致后面运行程序时会报错!

错误如下图所示:

3、修改spark-env.sh文件

修改spark目录下conf/spark-env.sh文件中的SPARK_DIST_CLASSPATH变量。把flume的相关jar包添加到此文件中。

[root@bigdata flume]# cd /usr/local/spark/conf
[root@bigdata conf]# vi spark-env.sh

将如下内容加到文件中: 

:/usr/local/spark/jars/flume/*:/usr/local/flume/lib/*

这样,Spark环境就准备好了。

(二)使用Flume作为Spark Streaming数据源

        Flume是非常流行的日志采集系统,可以作为Spark Streaming的高级数据源。请把Flume Source设置为netcat类型,从终端上不断给Flume Source发送各种消息,Flume把消息汇集到Sink,这里把Sink类型设置为avro,由Sink把消息推送给Spark Streaming,由自己编写的Spark Streaming应用程序对消息进行处理。

1、创建flume-to-spark.conf

[root@bigdata conf]# cd /usr/local/flume/conf
[root@bigdata conf]# vi flume-to-spark.conf

输入以下内容: 

#/usr/local/flume/conf/flume-to-spark.conf
#flume-to-spark.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 33333# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = localhosta1.sinks.k1.port =44444# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000000a1.channels.c1.transactionCapacity = 1000000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

#说明:
1、Flume suorce类为netcat,绑定到localhost的33333端口,消息可以通过telnet localhost 33333 发送到flume suorce
2、Flume Sink类为avro,绑定44444端口,flume sink通过localhost 44444端口把消息发送出来。而spark streaming程序一直监听44444端口。

#注意!!先不要启动Flume agent,因为44444端口还没打开,sink的消息无处可去,44444端口由spark streaming程序打开。

2、编写Spark程序使用Flume数据源

(1)创建python文件

[root@bigdata flume]# cd /home/zhc/mycode/flume
[root@bigdata flume]# vi FlumeEventCount.py

在FlumeEventCount.py中输入以下代码:  

#/home/zhc/mycode/flume/FlumeEventCount.pyfrom __future__ import print_functionimport sysfrom pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
import pyspark
if __name__ == "__main__":if len(sys.argv) != 3:print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="FlumeEventCount")ssc = StreamingContext(sc, 2)hostname= sys.argv[1]port = int(sys.argv[2])stream = FlumeUtils.createStream(ssc, hostname, port,pyspark.StorageLevel.MEMORY_AND_DISK_SER_2)stream.count().map(lambda cnt : "Recieve " + str(cnt) +" Flume events!!!!").pprint()ssc.start()ssc.awaitTermination()
~                               

注意:可能需要安装pyspark,命令为:

[root@bigdata flume]# pip3 install pyspark

(2)测试实际效果

 首先,启动Spark streaming程序:

[root@bigdata flume]# cd /usr/local/spark
[root@bigdata spark]# ./bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/flume/* /home/zhc/mycode/flume/FlumeEventCount.py localhost 44444

然后,启动一个新的终端,启动Flume Agent:

[root@bigdata zhc]# cd /usr/local/flume
[root@bigdata flume]# bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

最后,再启动一个新的终端连接33333端口:

现在你可以在最后这个终端里输入一些字符了。在你输入字符后可以看到第一个终端会显示如下的信息:

-------------------------------------------
Time: 1488029430000 ms
-------------------------------------------
Received 1 flume events!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/235616.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python高校舆情分析系统+可视化+情感分析 舆情分析+Flask框架(源码+文档)✅

毕业设计&#xff1a;2023-2024年计算机专业毕业设计选题汇总&#xff08;建议收藏&#xff09; 毕业设计&#xff1a;2023-2024年最新最全计算机专业毕设选题推荐汇总 &#x1f345;感兴趣的可以先收藏起来&#xff0c;点赞、关注不迷路&#xff0c;大家在毕设选题&#xff…

适用于 Windows 的 12 个最佳免费磁盘分区管理器软件

分区是与其他部分分开的硬盘驱动器部分。它使您能够将硬盘划分为不同的逻辑部分。分区软件是一种工具&#xff0c;可帮助您执行基本选项&#xff0c;例如创建、调整大小和删除物理磁盘的分区。许多此类程序允许您更改磁盘片的标签以便于识别数据。 适用于 Windows 的 12 个最佳…

【PaperReading】3. PTP

Category Content 论文题目 Position-guided Text Prompt for Vision-Language Pre-training Code: ptp 作者 Alex Jinpeng Wang (Sea AI Lab), Pan Zhou (Sea AI Lab), Mike Zheng Shou (Show Lab, National University of Singapore), Shuicheng Yan (Sea AI Lab) 另一篇…

爬虫01-爬虫原理以及爬虫前期准备工作

文章目录 1 爬虫基本原理什么是爬虫爬虫功能详解爬虫基本流程两个概念&#xff1a;request和response 2 一些问题爬虫能抓取什么样的数据&#xff1f;抓取的数据怎么提取部分内容&#xff1f;数据解析方式。为什么我爬虫抓取的数据和浏览器看到的不一样怎样解决JavaScript渲染的…

计算数学表达式的程序(Java课程设计)

1. 课设团队介绍 团队名称 团队成 员介绍 任务分配 团队成员博客 XQ Warriors 徐维辉 负责计算器数据的算法操作&#xff0c;如平方数、加减乘除&#xff0c;显示历史计算记录 无 邱良厦&#xff08;组长&#xff09; 负责计算器的图形设计&#xff0c;把输入和结果显…

公共用例库计划--个人版(二)主体界面设计

1、任务概述 计划内容&#xff1a;完成公共用例库的开发实施工作&#xff0c;包括需求分析、系统设计、开发、测试、打包、运行维护等工作。 1.1、 已完成&#xff1a; 需求分析、数据库表的设计&#xff1a;公共用例库计划–个人版&#xff08;一&#xff09; 1.2、 本次待完…

2024新年烟花代码完整版

文章目录 前言烟花效果展示使用教程查看源码HTML代码CSS代码JavaScript 新年祝福 前言 在这个充满希望和激动的2024年&#xff0c;新的一年即将拉开帷幕&#xff0c;而数字科技的创新与发展也如火如荼。烟花绚丽多彩的绽放&#xff0c;一直以来都是新年庆典中不可或缺的元素。…

微信小程序 组件component ts用法

还在为 使用了ts 但是组件内显示this.setData/this.data.xxx ts报错 觉得难看吗&#xff1f; 还在为明明定义了applyInfo&#xff0c;明明应该有setData为何报错&#xff1f; 还在为不知道如何写类型而烦心吗&#xff1f; 不如转变思路将methods看成为一个对象 增加断言 as a…

实现多级缓存(Redis+Caffeine)

文章目录 多级缓存的概述多级缓存的优势 多级缓存的概述 在高性能的服务架构设计中&#xff0c;缓存是一个不可或缺的环节。在实际的项目中&#xff0c;我们通常会将一些热点数据存储到Redis或MemCache这类缓存中间件中&#xff0c;只有当缓存的访问没有命中时再查询数据库。在…

公网环境使用移动端设备+cpolar远程访问本地群晖nas上的影视资源

文章目录 1.使用环境要求&#xff1a;2.下载群晖videostation&#xff1a;3.公网访问本地群晖videostation中的电影&#xff1a;4.公网条件下使用电脑浏览器访问本地群晖video station5.公网条件下使用移动端&#xff08;搭载安卓&#xff0c;ios&#xff0c;ipados等系统的设备…

小家电应用解决方案以及选型指南

电磁炉是现代厨房中常见的一种小家电产品&#xff0c;它利用电磁感应加热原理&#xff0c;可以快速、高效地进行烹饪。在电磁炉的设计和制造过程中&#xff0c;功率开关芯片的选择对于产品的性能和成本有着重要的影响。 针对电磁炉的应用需求&#xff0c;推荐采用LED驱动芯片S…

蓝桥杯省赛无忧 STL 课件12 vector

01 vector的定义和特性 02 vector的常用函数 03 vector排序去重 示例&#xff1a; #include<bits/stdc.h> using namespace std; int main(){vector<int> vec {5,2,8,1,9};sort(vec.begin(),vec.end());for(const auto& num : vec){cout<<num<<&q…

Centos7升级openssl到openssl1.1.1

Centos7升级openssl到openssl1.1.1 1、先查看openssl版本&#xff1a;openssl version 2、Centos7升级openssl到openssl1.1.1 升级步骤 #1、更新所有现有的软件包列表并安装最新的软件包&#xff1a; $sudo yum update #2、接下来&#xff0c;我们需要从源代码编译和构建OpenS…

【原生部署】SpringBoot+Vue前后端分离项目

本次主要讲解SpringBootVue前后端完全分离项目在CentOS云服务器上的环境搭建与部署过程&#xff0c;我们主要讲解原生部署。 一.原生部署概念 原生部署是指将应用程序&#xff08;一般是指软件、应用或服务&#xff09;在底层的操作系统环境中直接运行和部署&#xff0c;而不…

微软Office 2019 批量授权版

软件介绍 微软办公软件套件Microsoft Office 2019 专业增强版2024年1月批量许可版更新推送&#xff01;Office2019正式版2018年10月份推出&#xff0c;主要为多人跨平台办公与团队协作打造。Office2019整合对过去三年在Office365里所有功能&#xff0c;包括对Word、Excel、Pow…

Docker的介绍及安装基本操作命令

前言 Docker 是一个开源的应用容器引擎&#xff0c;基于 Go 语言 并遵从 Apache2.0 协议开源。 Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中&#xff0c;然后发布到任何流行的 Linux 机器上&#xff0c;也可以实现虚拟化。 容器是完全使用沙箱…

基于Selenium+Python的web自动化测试框架

一、什么是Selenium&#xff1f; Selenium是一个基于浏览器的自动化测试工具&#xff0c;它提供了一种跨平台、跨浏览器的端到端的web自动化解决方案。Selenium主要包括三部分&#xff1a;Selenium IDE、Selenium WebDriver 和Selenium Grid。 Selenium IDE&#xff1a;Firefo…

阿里云和AWS之间的应用程序防火墙比较及选择建议!

对于大多数开发人员来说&#xff0c;托管在云中的 Web 应用程序或 REST API 是一种常见方案。但是&#xff0c;并非每个应用程序都具有相同的安全级别。将 Web 应用程序防火墙 &#xff08;WAF&#xff09; 添加到 Web 应用程序是提高安全性的有用方法。 在本文中&#xff0c;…

Python实用小工具(4)——邮件轰炸机,给朋友搞点乐子(附源码+exe文件)

欢迎来到MatpyMaster&#xff01;今天我们将使用Python来批量发送邮件&#xff0c;让你的邮件推送变得更加高效。废话不多说&#xff0c;直接开搞&#xff01;使用声明&#xff1a; 请确保你的邮箱开启了SMTP服务&#xff0c;并获取了授权码。 选择合适的发送间隔&#xff0c;…

VBA中类的解读及应用第八讲:实现定时器功能的自定义类事件

《VBA中类的解读及应用》教程【10165646】是我推出的第五套教程&#xff0c;目前已经是第一版修订了。这套教程定位于最高级&#xff0c;是学完初级&#xff0c;中级后的教程。 类&#xff0c;是非常抽象的&#xff0c;更具研究的价值。随着我们学习、应用VBA的深入&#xff0…