Flume的安装及使用

Flume的安装及使用

Flume的安装及使用Flume的安装1、上传至虚拟机,并解压2、重命名目录,并配置环境变量3、查看flume版本4、测试flume5、flume的使用6、多sink7、多Agent

Flume的安装

1、上传至虚拟机,并解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/soft/

在环境变量中增加如下命令,可以使用 soft 快速切换到 /usr/local/soft

alias soft='cd /usr/local/soft/'

2、重命名目录,并配置环境变量
mv apache-flume-1.9.0-bin/ flume-1.9.0
vim /etc/profile
export FLUME_HOME=/usr/local/soft/flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin
source /etc/profile
3、查看flume版本
flume-ng version
[root@master soft]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@master soft]# 
4、测试flume
  • 监控一个目录,将数据打印出来

    • 配置文件

    # 首先先给agent起一个名字 叫a1
    # 分别给source channel sink取名字
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    ​
    # 分别对source、channel、sink进行配置
    ​
    # 配置source
    # 将source的类型指定为 spooldir 用于监听一个目录下文件的变化
    # 因为每个组件可能会出现相同的属性名称,所以在对每个组件进行配置的时候 
    # 需要加上 agent的名字.sources.组件的名字.属性 = 属性值
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/data/
    a1.sources.r1.fileSuffix = .ok
    a1.sources.r1.fileHeader = true
    ​
    # 配置channel
    # 将channel的类型设置为memory,表示将event缓存在内存中
    a1.channels.c1.type = memory
    ​
    # 配置sink
    # 使用logger作为sink组件,可以将收集到数据直接打印到控制台
    a1.sinks.k1.type = logger
    ​
    # 组装
    # 将sources的channels属性指定为c1
    a1.sources.r1.channels = c1
    ​
    # 将sinks的channel属性指定为c1
    a1.sinks.k1.channel = c1
    • 启动agent

    flume-ng agent -n a1 -f ./spoolingtest.conf -Dflume.root.logger=DEBUG,console
    • 新建/root/data目录

    mkdir /root/data
    • 在/root/data/目录下新建文件,输入内容,观察flume进程打印的日志

    # 随意在a.txt中加入一些内容
    vim /root/data/a.txt
5、flume的使用
  • spoolingToHDFS.conf

    • 配置文件

    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    ​
    #指定spooldir的属性
    a.sources.r1.type = spooldir 
    a.sources.r1.spoolDir = /root/data 
    a.sources.r1.fileHeader = true 
    ​
    #指定sink的类型
    a.sinks.k1.type = hdfs
    a.sinks.k1.hdfs.path = /data/flume/students
    # 指定文件名前缀
    a.sinks.k1.hdfs.filePrefix = student
    # 指定达到多少数据量写一次文件 单位:bytes
    a.sinks.k1.hdfs.rollSize = 102400
    # 指定多少条写一次文件
    a.sinks.k1.hdfs.rollCount = 1000
    # 指定文件类型为 流 来什么输出什么
    a.sinks.k1.hdfs.fileType = DataStream
    # 指定文件输出格式 为text
    a.sinks.k1.hdfs.writeFormat = text
    # 指定文件名后缀
    a.sinks.k1.hdfs.fileSuffix = .txt
    ​
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    ​
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    • 在 /root/data/目录下准备数据

    1500100003,单乐蕊,22,女,理科六班
    1500100004,葛德曜,24,男,理科三班
    1500100005,宣谷芹,22,女,理科五班
    1500100006,边昂雄,21,男,理科二班
    1500100007,尚孤风,23,女,文科六班
    1500100008,符半双,22,女,理科六班
    1500100009,沈德昌,21,男,理科一班
    1500100010,羿彦昌,23,男,理科六班
    1500100011,宰运华,21,男,理科三班
    1500100012,梁易槐,21,女,理科一班
    1500100013,逯君昊,24,男,文科二班
    1500100014,羿旭炎,23,男,理科五班
    1500100015,宦怀绿,21,女,理科一班
    1500100016,潘访烟,23,女,文科一班
    1500100017,高芷天,21,女,理科五班
    • 启动agent

    flume-ng agent -n a -f ./spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console
  • hbaseLogToHDFS

    • 配置文件

    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    ​
    #指定exec的属性
    a.sources.r1.type = exec 
    a.sources.r1.command = tail -200f /usr/local/soft/hbase-2.2.7/logs/hbase-root-master-master.log
    ​
    #指定sink的类型
    a.sinks.k1.type = hdfs
    a.sinks.k1.hdfs.path = /data/flume/hbase_log
    # 指定文件名前缀
    a.sinks.k1.hdfs.filePrefix = hbaselog
    # 指定达到多少数据量写一次文件 单位:bytes
    a.sinks.k1.hdfs.rollSize = 102400
    # 指定多少条写一次文件
    a.sinks.k1.hdfs.rollCount = 1000
    # 指定文件类型为 流 来什么输出什么
    a.sinks.k1.hdfs.fileType = DataStream
    # 指定文件输出格式 为text
    a.sinks.k1.hdfs.writeFormat = text
    # 指定文件名后缀
    a.sinks.k1.hdfs.fileSuffix = .txt
    ​
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    ​
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
  • hbaselogToHBase

    • 在hbase中创建log表

    create 'log','cf1'
    • 配置文件

    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    ​
    #指定exec的属性
    a.sources.r1.type = exec 
    a.sources.r1.command = tail -200f /usr/local/soft/hbase-2.2.7/logs/hbase-root-master-master.log
    ​
    #指定sink的类型
    a.sinks.k1.type = hbase2
    a.sinks.k1.table = hbase_log
    a.sinks.k1.columnFamily = info
    ​
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 100000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    ​
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    # 创建hbase表
    create 'hbase_log','info'
  • netcatLogger

    监听telnet端口

    • 安装telnet

    yum install telnet
    • 配置文件

    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    ​
    #指定netcat的属性
    a.sources.r1.type = netcat 
    a.sources.r1.bind = 0.0.0.0 
    a.sources.r1.port = 8888 
    ​
    #指定sink的类型
    a.sinks.k1.type = logger
    ​
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    ​
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    • 启动

      • 先启动agent

      flume-ng agent -n a -f ./netcatToLogger.conf -Dflume.root.logger=DEBUG,console
      • 在启动telnet

      telnet master 8888
  • httpToLogger

    • 配置文件

    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    ​
    #指定http的属性
    a.sources.r1.type = http
    a.sources.r1.port = 6666 
    ​
    #指定sink的类型
    a.sinks.k1.type = logger
    ​
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    ​
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
  
* 启动* 先启动agent
flume-ng agent -n a -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console
  * 再使用curl发起一个http请求
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "数加学院"}]' http://master:6666
```
6、多sink

vim netcat_to_hdfs.conf

# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1 k2
# 给channel组件命名为c1
a.channels = c1
​
#指定netcat的属性
a.sources.r1.type = netcat 
a.sources.r1.bind = 0.0.0.0 
a.sources.r1.port = 8888 
​
#指定sink的类型
a.sinks.k1.type = logger
​
​
#指定sink的类型
a.sinks.k2.type = hdfs
a.sinks.k2.hdfs.path = /data/flume/netcat
# 指定文件名前缀
a.sinks.k2.hdfs.filePrefix = hbaselog
# 指定达到多少数据量写一次文件 单位:bytes
a.sinks.k2.hdfs.rollSize = 102400
# 指定多少条写一次文件
a.sinks.k2.hdfs.rollCount = 1000
# 指定文件类型为 流 来什么输出什么
a.sinks.k2.hdfs.fileType = DataStream
# 指定文件输出格式 为text
a.sinks.k2.hdfs.writeFormat = text
# 指定文件名后缀
a.sinks.k2.hdfs.fileSuffix = .txt
​
#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
​
# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1
a.sinks.k2.channel = c1
7、多Agent

vim flume_hbase_regionserver_node1.conf

# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1 
# 给channel组件命名为c1
a.channels = c1
​
#指定exec的属性
a.sources.r1.type = exec 
a.sources.r1.command = tail -200f /usr/local/soft/hbase-2.2.7/logs/hbase-root-regionserver-node1.log
​
#指定sink的类型
a.sinks.k1.type = avro
a.sinks.k1.hostname = master
a.sinks.k1.port = 7777
​
#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 100000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
​
# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1

vim flume_hbase_regionserver_node2.conf

# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1 
# 给channel组件命名为c1
a.channels = c1
​
#指定exec的属性
a.sources.r1.type = exec 
a.sources.r1.command = tail -200f /usr/local/soft/hbase-2.2.7/logs/hbase-root-regionserver-node2.log
​
#指定sink的类型
a.sinks.k1.type = avro
a.sinks.k1.hostname = master
a.sinks.k1.port = 7777
​
#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 100000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
​
# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1

vim flume_data_to_hdfs_master.conf

# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1 
# 给channel组件命名为c1
a.channels = c1
​
#指定exec的属性
a.sources.r1.type = avro 
a.sources.r1.bind = 0.0.0.0 
a.sources.r1.port = 7777
​
#指定sink的类型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /data/flume/regionserver/log
# 指定文件名前缀
a.sinks.k1.hdfs.filePrefix = hbaselog
# 指定达到多少数据量写一次文件 单位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少条写一次文件
a.sinks.k1.hdfs.rollCount = 1000
# 指定文件类型为 流 来什么输出什么
a.sinks.k1.hdfs.fileType = DataStream
# 指定文件输出格式 为text
a.sinks.k1.hdfs.writeFormat = text
# 指定文件名后缀
a.sinks.k1.hdfs.fileSuffix = .txt
​
#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
​
# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/457178.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android 13 SystemUI 隐藏下拉快捷面板部分模块(wifi,bt,nfc等)入口

frameworks/base/packages/SystemUI/src/com/android/systemui/qs/tileimpl/QSFactoryImpl.java createTileInternal(tileSpec)方法注释想隐藏的模块即可。

【C++进阶篇】——STL的简介

【C进阶篇】——STL的简介 1.什么是STL STL(standard template libaray-标准模板库):是C标准库的重要组成部分,不仅是一个可复用的组件库,而且是一个包罗数据结构与算法的软件框架。 2.STL的版本 原始版本 Alexander Stepanov、Meng Lee 在…

redis集群配置

一、Redis集群的三种方式 Redis集群提供了三种分布式方案:主从模式:一个主节点和一个或多个从节点,主节点负责写操作,从节点负责读操作,实现读写分离,分担主节点的压力。哨兵模式:哨兵系统用于监…

【每日一题】LeetCode - 盛最多水的容器

给定一个长度为 n 的整数数组 height。有 n 条垂线,第 i 条线的两个端点是 (i, 0) 和 (i, height[i])。要求找出其中的两条线,使得它们与 x 轴共同构成的容器可以容纳最多的水。 输入示例: height [1,8,6,2,5,4,8,3,7]输出: 4…

Python依赖库的几种离线安装方法

Python依赖库的几种安装方法 python经常需要安装一些依赖库,但是有时候环境可以连通python源,有时不能连通需要离线安装(安装单个库包或者整个库环境),使用pip的如下方法可以相对简单解决问题。 一、如何copy一个pyt…

Linux 端口占用 kill被占用的端口 杀掉端口

1、yum install lsof 2、输入netstat -tln,查看系统当前所有被占用端口 3、根据端口查询进程,输入lsof -i :9555,切记不要忘了添加冒号 4、 既然知道进程号了,那杀死当前进程就简单多了,直接 kill -9 PID 回车

如何通过企业架构蓝图引导企业实现数字化转型:构建与实施的全方位指南

在当今迅速变化的商业环境中,企业进行数字化转型已成为提升竞争力、优化运营的必要手段。企业架构蓝图(EA Blueprint)作为指导企业数字化转型的战略工具,不仅提供了系统化的设计和规划路径,还帮助企业在技术与业务目标…

【读书笔记·VLSI电路设计方法解密】问题26:什么是漏电流问题

功耗现已成为半导体行业面临的主要技术难题。在当前基于CMOS的VLSI电路中,有两种主要的功耗来源:动态功耗和静态功耗。动态功耗来源于晶体管的切换以及芯片上数百万逻辑门输出端的电容反复充电和放电,是芯片为产生有效输出所消耗的能量。静态功耗则指即使在晶体管关闭时也会…

法治在沃刷积分-刷文章浏览数

最近有一个任务,需要通过浏览文章来获取积分,一个个手点文章太麻烦,专业的事情还得专业的来。 法1:模拟发包 抓包发现,是通过接口来使积分增长,那直接模拟发包即可。 至于info_id的获取,可以通…

2024年全球 MoonBit 编程创新赛-零基础早鸟教程-使用wasm4八小时开发井子棋小游戏

前言 本篇文章主要分享 “2024年全球 MoonBit 编程创新赛 游戏赛道”参赛过程中九宫棋游戏的开发技巧和心得。以此抛砖引玉。首先介绍下 MoonBit。 月兔语言 MoonBit 是一个用于云计算和边缘计算的 WebAssembly 端到端的编程语言工具链。 您可以访问 https://try.moonbitlang.…

文本预处理操作简述

自然语言处理 (NLP) 是数据科学的一个分支,主要处理文本数据。除了数值数据外,文本数据也广泛可用,用于分析和解决业务问题。然而,在使用数据进行分析或预测之前,处理数据非常重要。 我们执行文本预处理来准备用于模型…

mysql的卸载与安装

一、mysql的卸载 1、用管理员模式的打开cmd,我的服务名是mysql。 net stop 【你的服务名】 sc delete 【你的服务名】 2、将下图中有包含‘bin’目录,‘data’目录等等的这个总目录删掉 如图我的目录是:mysql-5.7.28-winx64 3、删除mysql的隐…

代码随想录算法训练营Day39 | 卡玛网-46.携带研究材料、416. 分割等和子集

目录 卡玛网-46.携带研究材料 416. 分割等和子集 卡玛网-46.携带研究材料 题目 卡玛网46. 携带研究材料(第六期模拟笔试) 题目描述: 小明是一位科学家,他需要参加一场重要的国际科学大会,以展示自己的最新研究成…

day3:管道,解压缩,vim

一,管道(|) 引入 当我们要将本次命令结果作为下次命令参数时就可以用到,极大的简化了操作。 比如:head -5 文件| tail -1:表示显示第五行这就是管道的魅力 概述 管道符:| 作用&#xff1a…

【论文阅读】ESRGAN+

学习资料 论文题目:进一步改进增强型超分辨率生成对抗网络(ESRGAN : FURTHER IMPROVING ENHANCED SUPER-RESOLUTION GENERATIVE ADVERSARIAL NETWORK)论文地址:2001.08073代码:ncarraz/ESRGANplus: ICASSP …

Android中的epoll机制

深入理解Android中的epoll机制 在Android系统中,epoll广泛用于高效管理网络和文件的I/O操作。它通过减少CPU资源消耗和避免频繁的内核态-用户态切换,实现了在多连接、多任务环境中的高性能。epoll的特性使其非常适合Android系统中网络服务器、Socket通信…

Android 15自定义设置导航栏与状态栏,EdgeToEdge适配

背景:android api 35,activity设置EdgeToEdge.enable((ComponentActivity) this)前提下 一、设置导航栏与状态栏颜色 设置的状态栏颜色,只需要设置fitsSystemWindows跟setOnApplyWindowInsetsListener xml设置: 代码:…

比例数据可视化(Python实现板块层级图绘制)——Instacart Market Basket Analysis

【实验名称】 实验一:绘制板块层级图 【实验目的】 1. 掌握数据文件读取 2. 掌握数据处理的方法 3. 实现板块层级图的绘制 【数据介绍】Instacart Market Basket Analysis 1. 数据说明 数据共有300 0000orders, 20 0000users, …

logback日志脱敏后异步写入文件

大家项目中肯定都会用到日志打印,目的是为了以后线上排查问题方便,但是有些企业对输出的日志包含的敏感(比如:用户身份证号,银行卡号,手机号等)信息要进行脱敏处理。 哎!我们最近就遇到了日志脱敏的改造。可…

使用text-embedding-3-small生成向量并将向量插入Mlivus Cloud用于语义搜索的深度解析与实战操作

使用text-embedding-3-small生成向量并将向量插入Mlivus Cloud用于语义搜索的深度解析与实战操作 在当今的大数据时代,文本数据的处理与分析显得尤为重要。如何高效地存储、查询和理解这些海量文本数据,成为了许多企业和研究机构面临的重大挑战。幸运的是,随着向量数据库技…