Flume的安装和使用

一、安装Flume

1. 下载flume-1.7.0

http://mirrors.shu.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

2. 解压改名

tar xvf apache-flume-1.7.0-bin.tar.gz

mv apache-flume-1.7.0-bin flume

二、配置Flume

1. 配置sh文件

cp conf/flume-env.sh.template  conf/flume-env.sh

2. 配置conf文件

cp conf/flume-conf.properties.template conf/flume.conf

三、将文件输出到日志

1、配置源是那里,目标是那里

# source: avro, channel: memory, sink: log

# Define a memory channel called ch1 on agent1

agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it

# to bind to 0.0.0.0:41414. Connect it to channel ch1.

agent1.sources.avro-source1.channels = ch1

agent1.sources.avro-source1.type = avro

agent1.sources.avro-source1.bind = 0.0.0.0

agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives

# and connect it to the other end of the same channel.

agent1.sinks.log-sink1.channel = ch1

agent1.sinks.log-sink1.type = logger

# Finally, now that we've defined all of our components, tell

# agent1 which ones we want to activate.

agent1.channels = ch1

agent1.sources = avro-source1

agent1.sinks = log-sink1

2、启动Flume agent

bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1

3、启动Flume Client

echo hello, world! > /usr/local/flume-test.log

echo hello, hadoop! >> /usr/local/flume-test.log

echo hello, flume! >> /usr/local/flume-test.log

echo hello, spark! >> /usr/local/flume-test.log

bin/flume-ng avro-client --conf conf -H localhost -p 41414 -F /usr/local/flume-test.log -Dflume.root.logger=DEBUG,console

4、client启动结果

2018-05-11 11:24:05,624 (main) [DEBUG - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:498)] Batch size string = 5

2018-05-11 11:24:05,644 (main) [WARN - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634)] Using default maxIOWorkers

2018-05-11 11:24:06,100 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:233)] Finished

2018-05-11 11:24:06,100 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:236)] Closing reader

2018-05-11 11:24:06,101 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:240)] Closing RPC client

2018-05-11 11:24:06,107 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:84)] Exiting

5、Flume Server结果

2018-05-11 11:24:05,772 (New I/O server boss #3) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xce7f8eac, /127.0.0.1:52790 => /127.0.0.1:41414] OPEN

2018-05-11 11:24:05,773 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xce7f8eac, /127.0.0.1:52790 => /127.0.0.1:41414] BOUND: /127.0.0.1:41414

2018-05-11 11:24:05,773 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xce7f8eac, /127.0.0.1:52790 => /127.0.0.1:41414] CONNECTED: /127.0.0.1:52790

2018-05-11 11:24:06,076 (New I/O worker #1) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:378)] Avro source avro-source1: Received avro event batch of 4 events.

2018-05-11 11:24:06,104 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xce7f8eac, /127.0.0.1:52790 :> /127.0.0.1:41414] DISCONNECTED

2018-05-11 11:24:06,104 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xce7f8eac, /127.0.0.1:52790 :> /127.0.0.1:41414] UNBOUND

2018-05-11 11:24:06,104 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xce7f8eac, /127.0.0.1:52790 :> /127.0.0.1:41414] CLOSED

2018-05-11 11:24:06,104 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /127.0.0.1:52790 disconnected.

2018-05-11 11:24:08,870 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 2C 20 77 6F 72 6C 64 21          hello, world! }

2018-05-11 11:24:08,870 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 2C 20 68 61 64 6F 6F 70 21       hello, hadoop! }

2018-05-11 11:24:08,870 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 2C 20 66 6C 75 6D 65 21          hello, flume! }

2018-05-11 11:24:08,871 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 2C 20 73 70 61 72 6B 21          hello, spark! }

可以看到server端收到了client写入的数据

四、用Flume监控目录将新文件上传到HDFS

1、配置方式

# 将/usr/local/flume/tmpData下的文本文件上传到hdfs://node10:9000/test/

# source: spooldir, channel: memory, sink: hdfs

# Define a memory channel called ch1 on agent1

agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it

# to bind to 0.0.0.0:41414. Connect it to channel ch1.

agent1.sources.spooldir-source1.channels = ch1

agent1.sources.spooldir-source1.type = spooldir

agent1.sources.spooldir-source1.spoolDir=/usr/local/flume/tmpData

# Event的headers增加文件名如:headers:{basename=a.txt}

agent1.sources.spooldir-source1.basenameHeader = true

# Define a logger sink that simply logs all events it receives

# and connect it to the other end of the same channel.

# 文件名是:a.txt.1536559992413

agent1.sinks.hdfs-sink1.channel = ch1

agent1.sinks.hdfs-sink1.type = hdfs

agent1.sinks.hdfs-sink1.hdfs.path = hdfs://node10:9000/test

# hdfs.filePrefix = log_%Y%m%d_%H 如:log_20151016_13.1444973768543

agent1.sinks.hdfs-sink1.hdfs.filePrefix = %{basename}

agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true

agent1.sinks.hdfs-sink1.hdfs.round = true

agent1.sinks.hdfs-sink1.hdfs.roundValue = 10

agent1.sinks.hdfs-sink1.hdfs.rollSize = 0

agent1.sinks.hdfs-sink1.hdfs.rollCount = 0

# Finally, now that we've defined all of our components, tell

# agent1 which ones we want to activate.

agent1.channels = ch1

agent1.sources = spooldir-source1

agent1.sinks = hdfs-sink1

2、通过Flume启动agent

bin/flume-ng agent --conf ./conf/ -f ./conf/flume.conf --name agent1 -Dflume.root.logger=DEBUG,console

3、查看hdfs

http://node10(节点ip):50070

3、监控目录的文件

每个文件读取完之后,修改后缀为.COMPLETED,重启flume时忽略这些文件

五、多个Agent - 多个Collector配置

1、Flume  Agent配置

#flume-client.properties

##### source: log(node1), channel: memory, sink: avro(node2, node3)

#agent1 name

agent1.channels = c1

agent1.sources = r1

agent1.sinks = k1 k2

#set gruop

agent1.sinkgroups = g1

#set channel

agent1.channels.c1.type = memory

agent1.channels.c1.capacity = 1000

agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1

agent1.sources.r1.type = exec

agent1.sources.r1.command = tail -F /usr/local/hadoop/logs/hadoop-root-namenode-node1.log

agent1.sources.r1.interceptors = i1 i2

agent1.sources.r1.interceptors.i1.type = static

agent1.sources.r1.interceptors.i1.key = Type

agent1.sources.r1.interceptors.i1.value = LOGIN

agent1.sources.r1.interceptors.i2.type = timestamp

# set sink1

agent1.sinks.k1.channel = c1

agent1.sinks.k1.type = avro

agent1.sinks.k1.hostname = node2

agent1.sinks.k1.port = 52020

# set sink2

agent1.sinks.k2.channel = c1

agent1.sinks.k2.type = avro

agent1.sinks.k2.hostname = node3

agent1.sinks.k2.port = 52020

#set sink group

agent1.sinkgroups.g1.sinks = k1 k2

#设置失效恢复

agent1.sinkgroups.g1.processor.type = failover

agent1.sinkgroups.g1.processor.priority.k1 = 10

agent1.sinkgroups.g1.processor.priority.k2 = 1

agent1.sinkgroups.g1.processor.maxpenalty = 10000

2、Flume Collector配置

# flume-server.properties

#set Agent name

a1.sources = r1

a1.channels = c1

a1.sinks = k1

#set channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# other node,change node2 to node3

a1.sources.r1.type = avro

a1.sources.r1.bind = node2

a1.sources.r1.port = 52020

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

a1.sources.r1.interceptors.i1.key = Collector

a1.sources.r1.interceptors.i1.value = NNA

a1.sources.r1.channels = c1

#set sink to hdfs

a1.sinks.k1.type=hdfs

a1.sinks.k1.hdfs.path=/flume/logdfs

a1.sinks.k1.hdfs.fileType=DataStream

a1.sinks.k1.hdfs.writeFormat=TEXT

a1.sinks.k1.hdfs.rollInterval=1

a1.sinks.k1.channel=c1

a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

2、Flume启动Agent

flume-ng agent -n agent1 -c conf -f conf/flume-client.properties -Dflume.root.logger=DEBUG,console

3、Flume启动Collector

flume-ng agent -n a1 -c conf -f conf/flume-server.properties -Dflume.root.logger=DEBUG,console

4、Flume Failover测试

Collector1的权重比Collector2大

kill掉Collector1,Collector2工作

恢复Collector1,Collector1工作

5、Flume  LoadBalancer

# 设置负载均衡

#agent1.sinkgroups.g1.processor.type = load_balance

#agent1.sinkgroups.g1.processor.backoff = true

#agent1.sinkgroups.g1.processor.selector = round_robin

六、Flume 整合Kafka

#设置Kafka接收器

a1.sinks.k2.type= org.apache.flume.sink.kafka.KafkaSink

#设置Kafka的broker地址和端口号

a1.sinks.k2.kafka.bootstrap.servers=node1:9092,node2:9092,node3:9092

#a1.sinks.k2.defaultPartitionId=0

a1.sinks.k2.producer.type=sync

#设置Kafka的Topic

a1.sinks.k2.kafka.topic=TestTopic

#设置序列化方式

a1.sinks.k2.serializer.class=kafka.serializer.StringEncoder

a1.sinks.k2.channel=c1

七、flume传递header到spark

#a1.sinks.k2.serializer.class=org.apache.kafka.common.serialization.BytesDeserializer

a1.sinks.k2.useFlumeEventFormat = true

val kafkaParams = Map[String, Object](

"bootstrap.servers" -> "192.168.60.15:9092",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[BytesDeserializer],

"group.id" -> "use_a_separate_group_id_for_each_stream",

"auto.offset.reset" -> "latest",

"enable.auto.commit" -> (false: java.lang.Boolean))

val topics = Array("sspclick")

val sspclick = KafkaUtils.createDirectStream[String, Bytes](

ssc,

PreferConsistent,

Subscribe[String, Bytes](topics, kafkaParams))

val reader = new SpecificDatumReader[AvroFlumeEvent](classOf[AvroFlumeEvent])

val a = sspclick.map(f => {

var body = f.value().get

var decoder = DecoderFactory.get().binaryDecoder(body, null);

var result = reader.read(null, decoder);

var hostname = result.getHeaders.get(new Utf8("hostname"))

var text = new String(result.getBody.array())

var array = text.split("\t")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/500979.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL 03 章——基本的SELECT语句

一、SQL概述 (1)SQL背景知识 SQL(Structured Query Language,结构化查询语言)是使用关系模型的数据库应用语言,与数据直接打交道不同的数据库管理系统生产厂商都支持SQL语句,但都有特有内容 …

《HarmonyOS第一课》焕新升级,赋能开发者快速掌握鸿蒙应用开发

随着HarmonyOS NEXT发布,鸿蒙生态日益壮大,广大开发者对于系统化学习平台和课程的需求愈发强烈。近日,华为精心打造的《HarmonyOS第一课》全新上线,集“学、练、考”于一体,凭借多维融合的教学模式与系统课程设置&…

JS实现SVG的TEXT标签自动换行功能

首先定义了一个RectAndText组件&#xff0c;这个组件实现了在矩形中显示居中的文本&#xff08;矩形可以根据自己需求要或者不要&#xff09; <template><rect :x"x" :y"y" :width"width" :height"height" :stroke"str…

IDEA2023.1修改默认Maven配置

IDEA2023.1修改默认Maven配置 1. 默认路径&#xff1a;C:\Users\Administrator\.m2\repository 2.Maven安装路径&#xff1a;D:\Tools\apache-maven-3.8.1 3.修改为自己的安装路径&#xff0c;点击铅笔图标进行修改 以后新建的项目就会自动把Maven指向自己配置的目录。

Docker--Docker Container(容器) 之 操作实例

容器的基本操作 容器的操作步骤其实很简单&#xff0c;根据拉取的镜像&#xff0c;进行启动&#xff0c;后可以查看容器&#xff0c;不用时停止容器&#xff0c;删除容器。 下面简单演示操作步骤 1.创建并运行容器 例如&#xff0c;创建一个名为"my-nginx"的交互…

USB射频微波功率计的功能与优势-盛铂科技

USB射频功率计是一种用于测量射频信号&#xff08;RF&#xff09;功率的仪器&#xff0c;它通过USB接口与计算机或其他设备连接&#xff0c;以便于进行数据采集、处理和显示。 主要功能 功率测量&#xff1a;能够测量射频信号的功率&#xff0c;通常以毫瓦&#xff08;mW&…

GDPU 数据库原理 期末复习

文章目录 &#x1f468;‍&#x1f3eb; 必看&#xff1a;期末考点&#x1f342; 学习通&#x1f341; 章节小测一 绪论&#x1f353; 选择题 &#x1f341; 章节小测二 关系数据库&#x1f353; 简答题1. 写关系代数2. 写关系代数 &#x1f353; 选择题 &#x1f341; 章节小测…

Springboot:后端接收数组形式参数

1、接收端写法 PermissionAnnotation(permissionName "",isCheckToken true)PostMapping("/batchDeleteByIds")public ReturnBean webPageSelf( NotNull(message "请选择要删除的单据&#xff01;") Long[] ids) {for (Long string : ids) {l…

系统设计——大文件传输方案设计

摘要 大文件传输是指通过网络将体积较大的文件从一个位置发送到另一个位置的过程。这些文件可能包括高清视频、大型数据库、复杂的软件安装包等&#xff0c;它们的大小通常超过几百兆字节&#xff08;MB&#xff09;甚至达到几个吉字节&#xff08;GB&#xff09;或更大。大文…

国产编辑器EverEdit - 常用资源汇总

1 国产编辑器EverEdit-常用资源汇总 EverEdit是一款国产文本编辑器&#xff0c;历经超过15年的更新和维护&#xff0c;拥有不输业界顶级商业文本编辑器(EmEditor、UltraEdit)的实力&#xff0c;甚至在某些方面的功能更强(当然&#xff0c;各有千秋)&#xff0c;开发者对文本编辑…

Linux C/C++编程-获得套接字地址、主机名称和主机信息

【图书推荐】《Linux C与C一线开发实践&#xff08;第2版&#xff09;》_linux c与c一线开发实践pdf-CSDN博客《Linux C与C一线开发实践&#xff08;第2版&#xff09;&#xff08;Linux技术丛书&#xff09;》(朱文伟&#xff0c;李建英)【摘要 书评 试读】- 京东图书 (jd.com…

爱死机第四季(秘密关卡)4KHDR国语字幕

通过网盘分享的文件&#xff1a;love_death_robot 链接: https://pan.baidu.com/s/1bG3Xtdopenil2O_y93hY_g?pwd8kib 提取码: 8kib

目标检测入门指南:从原理到实践

目录 1. 数据准备与预处理 2. 模型架构设计 2.1 特征提取网络原理 2.2 区域提议网络(RPN)原理 2.3 特征金字塔网络(FPN)原理 2.4 边界框回归原理 2.5 非极大值抑制(NMS)原理 2.6 多尺度训练与测试原理 2.7 损失函数设计原理 3. 损失函数设计 4. 训练策略优化 5. 后…

慧集通iPaaS集成平台低代码培训-基础篇

训练使用素材&#xff1a; 1.数据源&#xff1a; 单号业务日期工厂仓库物料单位数量批次0100012022-5-1210031001030001kg500202304150100012022-5-1210031001030001kg122202304150100012022-5-1210031001030001kg1250202304150100012022-5-1210031001030002kg130202304110100…

框架(Mybatis基础配置)

mybatis的使用步骤 1.引入依赖 <!-- Mybatis依赖--><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.5.5</version></dependency> <!-- Mysql连接依赖-->…

2、pycharm常用快捷命令和配置【持续更新中】

1、常用快捷命令 Ctrl / 行注释/取消行注释 Ctrl Alt L 代码格式化 Ctrl Alt I 自动缩进 Tab / Shift Tab 缩进、不缩进当前行 Ctrl N 跳转到类 Ctrl 鼠标点击方法 可以跳转到方法所在的类 2、使用pip命令安装request库 命令&#xff1a;pip install requests 安装好了…

常见中间件漏洞复现

1.tomcat 1.1 CVE-2017-12615(put上传) 当在Tomcat的conf&#xff08;配置目录下&#xff09;/web.xml配置文件中添加readonly设置为false时&#xff0c;将导致该漏洞产 ⽣&#xff0c;&#xff08;需要允许put请求&#xff09; , 攻击者可以利⽤PUT方法通过精心构造的数据包…

JavaWeb开发(五)Servlet-ServletContext

1. ServletContext 1.1. ServletContext简介 1.1.1. ServletContext定义 ServletContext即Servlet上下文对象&#xff0c;该对象表示当前的web应用环境信息。 1.1.2. 获取ServletContext对象: &#xff08;1&#xff09;通过ServletConfig的getServletContext()方法可以得到…

【Redis】集群配置(主从复制 哨兵搭建)

文章目录 集群配置主从复制哨兵搭建 集群配置 Redis 集群提供了三种分布式方案&#xff1a; 主从模式&#xff1a;一个主节点和一个或多个从节点&#xff0c;主节点负责写操作&#xff0c;从节点负责读操作&#xff0c;实现读写分离&#xff0c;分担主节点的压力。 哨兵模式…

设计模式の状态策略责任链模式

文章目录 前言一、状态模式二、策略模式三、责任链模式 前言 本篇是关于设计模式中的状态模式、策略模式、以及责任链模式的学习笔记。 一、状态模式 状态模式是一种行为设计模式&#xff0c;核心思想在于&#xff0c;使某个对象在其内部状态改变时&#xff0c;改变该对象的行为…