Flume——进阶(agent特性+三种结构:串联,多路复用,聚合)

目录

  • agent特性
    • ChannelSelector
      • 描述:
    • SinkProcessor
      • 描述:
  • 串联架构
    • 结构图解
    • 定义与描述
    • 配置示例
      • Flume1(监测端node1)
      • Flume3(接收端node3)
      • 启动方式
  • 复制和多路复用
    • 结构图解
    • 定义描述
    • 配置示例
      • node1
      • node2
      • node3
      • 启动方式
  • 聚合架构
    • 结构图解
    • 定义描述
    • 示例
      • node1
      • node2
      • node3


agent特性

在这里插入图片描述

ChannelSelector

ChannelSelector是Flume中的一个关键组件,负责根据特定逻辑决定Event的流向。

名称类型描述
ReplicatingSelectorChannelSelector类型将同一个Event复制并发往所有配置的Channel
MultiplexingSelectorChannelSelector类型根据预设的规则或条件,将不同的Event分发至不同的Channel

描述:

  • ReplicatingSelector会无条件地将每个Event发送到与其关联的所有Channel中,实现事件复制。
  • MultiplexingSelector则基于某种规则(如Event中的特定字段、时间戳等)来将Event分发到不同的Channel,实现事件的多路复用。

SinkProcessor

SinkProcessor是Flume中负责处理Sink中Event的组件,它决定了Event如何被发送和处理。

名称类型描述
DefaultSinkProcessorSinkProcessor类型对应于单个Sink,直接处理并发送Event至该Sink
LoadBalancingSinkProcessorSinkProcessor类型对应于Sink Group,实现负载均衡,将Event分发至多个Sink中处理
FailoverSinkProcessorSinkProcessor类型对应于Sink Group,提供错误恢复功能,当主Sink失败时自动切换至备用Sink

描述:

  • DefaultSinkProcessor是最基础的Sink处理器,直接与单个Sink关联,负责将Event发送至该Sink。
  • LoadBalancingSinkProcessor用于处理Sink Group,能够智能地将Event分发至多个Sink中,以实现负载均衡,提高处理效率。
  • FailoverSinkProcessor同样用于处理Sink Group,但它提供了错误恢复机制。当主Sink因故障无法工作时,它会自动将Event发送至备用Sink,以确保数据的连续性和可靠性。

串联架构

结构图解

在这里插入图片描述
在这里插入图片描述
Avro Sink作为Avro客户端,向Avro服务端发送Avro事件。它允许Flume Agent将数据以Avro格式序列化后,发送到指定的Avro Source或其他Avro客户端。

定义与描述

这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。

配置示例

Flume1(监测端node1)

Flume1(node1),监听node1上的44444端口(source),并输出到node3的10086端口上(sink)

a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node1
# port,监听的端口
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = avro
# 指定 Avro Sink 发送数据的目标主机名和端口号
a1.sinks.k1.hostname = node3
a1.sinks.k1.port = 10086# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Flume3(接收端node3)

Flume3(node3),监听node3上的10086端口(source)(当然source内容是来自node1的44444端口的变化情况),输出一般的控制台内容

a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
# 监听的来自node3上的source,source类型为avro
a1.sources.r1.type = avro
a1.sources.r1.bind = node3
# port,监听的端口
a1.sources.r1.port = 10086# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动方式

先启动node3(flume3),node3的监听是串行的最后一环,从后向前依次启动
理由:
先启动node3的监听(此时node1还未启动),再启动node1,此时可以保证没有任何内容错过


复制和多路复用

结构图解

在这里插入图片描述

在这里插入图片描述

定义描述

Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。详细可以参考上面的Agent ChannelSelector和SinkProcessor

配置示例

此部分示例会按照如上的结构图进行配置

node1

replicating_channel.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 这个selector是复制类型的。
# 复制selector会将接收到的每个事件复制到所有配置的channel中。
a1.sources.r1.selector.type = replicating# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
# avro类型的sink,发送给下一个agent
# sink k1的参数配置
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node2 
a1.sinks.k1.port = 10010# sink k2的参数配置
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node3
a1.sinks.k2.port = 10010# channel c1的参数配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# channel c2的参数配置
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

node2

接收node1,并输出到hdfs中,hdfs的参数配置:flume——hdfs

a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
# avro类型的source,接收来自上一个agent的sink输出
a2.sources.r1.type = avro
# 这个source来自于node2节点的10010端口
a2.sources.r1.bind = node2
a2.sources.r1.port = 10010# 传输至hdfs中
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = /flume2/%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 2
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

node3

接收node1,并输出到日志

a3.sources = r3
a3.sinks = k3
a3.channels = c3# Describe/configure the source
a3.sources.r3.type = avro
a3.sources.r3.bind = node3
a3.sources.r3.port = 10010# Describe the sink
a3.sinks.k3.type = logger# Describe the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

启动方式

先启动node2(flume2)、node3(flume3),在启动node1(flume1)
理由:
同上,请注意,无论何种架构,都应到先启动最末端的接收,再启动发送


聚合架构

结构图解

在这里插入图片描述

在这里插入图片描述

定义描述

最常见实用的结构模式。
日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的flume,再由此flume上传到hdfs、hive、hbase等,进行日志分析。

示例

node1

发送端1,输出到node3的10000端口
没什么需要特别注明的地方,关键节点已经在前面描述了,建议直接复制代码,GPT检查

[root@node1 jobs]# vim agg1.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node3 
a1.sinks.k1.port = 10000# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

node2

发送端2,输出到node3的10000端口

a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
# source端的netcat是一个数据接收服务
a2.sources.r1.type = netcat
a2.sources.r1.bind = node2
a2.sources.r1.port = 10000# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = node3
a2.sinks.k1.port = 10000# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

node3

最末的接收端,监听10000端口即可,前面两个节点会发送内容到此端口

[root@node3 jobs]# vim agg3.conf
# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3# Describe/configure the source
a3.sources.r3.type = avro
a3.sources.r3.bind = node3
a3.sources.r3.port = 10000# Describe the sink
a3.sinks.k3.type = logger# Describe the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/485611.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【python自动化一】pytest的基础使用

1.pytest简述 pytest‌ 是一个功能强大且灵活的Python测试框架,其主要是用于流程控制,具体用于UI还是接口自动化根据个人需要而定。且其具有丰富插件,使用时较为方便。咱们具体看下方的内容,本文按照使用场景展开,不完…

️ 在 Windows WSL 上部署 Ollama 和大语言模型的完整指南20241206

🛠️ 在 Windows WSL 上部署 Ollama 和大语言模型的完整指南 📝 引言 随着大语言模型(LLM)和人工智能的飞速发展,越来越多的开发者尝试在本地环境中部署大模型进行实验。然而,由于资源需求高、网络限制多…

buuctf:rar

根据题目所示,直接进行爆破 爆破后密码是8795,解压后得到flag flag{1773c5da790bd3caff38e3decd180eb7}

李飞飞空间智能来了:AI生成可探索交互的3D世界,颠覆游戏电影VR行业

目录 前言图生世界摄影效果景深效果滑动变焦 3D效果交互效果动画效果 走进大师的艺术工作流总结 前言 12月3日,有AI“教母”之称的李飞飞发布了空间智能的一个项目,一经发布就立刻引爆了外网。这个项目是仅仅通过一张图片,AI就可以快速的构建…

dockerfile部署前后端(vue+springboot)

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言0.环境说明和准备1.前端多环境打包1.1前端多环境设置1.2打包 2.后端项目多环境配置以及打包2.1后端多环境配置2.2项目打包 3.文件上传4.后端镜像制作4.1dockerf…

Numpy基础练习

import numpy as np 1.创建一个长度为10的一维全为0的ndarray对象,然后让第5个元素等于1 n np.zeros(10,dtypenp.int32) n[4] 12.创建一个元素从10到49的ndarray对象 n np.arrange(10,50)3.将第2题的所有元素位置反转 n[::-1]使用np.random.random创建一个10*10的ndarray对象…

MongoDB分片集群搭建及扩容

分片集群搭建及扩容 整体架构 环境准备 3台Linux虚拟机,准备MongoDB环境,配置环境变量。一定要版本一致(重点),当前使用 version4.4.9 配置域名解析 在3台虚拟机上执行以下命令,注意替换实际 IP 地址 e…

Java项目实战II基于微信小程序的亿家旺生鲜云订单零售系统的设计与实现(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、核心代码 五、源码获取 全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者,专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 随着移动互联网技术的不断…

数据结构与算法-03链表-03

递归与迭代 由一个问题引出 假设我们要计算 一个正整数的阶乘, N! 。 从数学上看 1! 1 2! 2 x 1 3! 3 x 2 x 1 4! 4 x 3 x 2 x 1 5! 5 x 4 x 3 x 2 x 1 : n! n x (n-1) x (n-2) x (n-3) x ... 1我们推出一般公式 f(1) 1 f(n) n * f(n-1…

Unity 设计模式-观察者模式(Observer Pattern)详解

观察者模式 观察者模式(Observer Pattern)是一种行为型设计模式,它定义了对象之间的一对多依赖关系。当一个对象的状态发生变化时,它的所有依赖者(观察者)都会收到通知并自动更新。这种模式用于事件处理系…

第四篇:k8s 理解Service工作原理

什么是service? Service是将运行在一组 Pods 上的应用程序公开为网络服务的抽象方法。 简单来说K8s提供了service对象来访问pod。我们在《k8s网络模型与集群通信》中也说过k8s集群中的每一个Pod(最小调度单位)都有自己的IP地址,都…

地瓜RDK X5上手ollama大模型测试

地瓜RDK X5上手ollama大模型测试 契机 ⚙ 上次逛ollama的时候发现有很多小参数的大模型,比如qwen2:0.5b,llama3.2:1b,甚至还有一个1.8b的多模态模型moondream,找公司1拿到一块RDK X5的开发板,官网查看算力可达10TOPS&#xff0c…

【Java】反射简介

框架的核心和架构师的核心 反射和代理是重中之重 反射 反射的作用 在运行的时候由代码获取类的信息 三种获取类信息的方式: 对象.getClass()Class.forName("类的路径")类.class Class :一个用来存储类信息的类 获取类信息是获取的整体的…

Windows电脑伪关机(快速启动模式),怎么真关机

Windows电脑在关机的时候,进入到一个伪关机的状态,也就是并没有真正的关机,但是在一些系统更新、变更了一些设置,进行重启等操作也会进入到真关机状态 这种一般是开启快速启动模式,开启了快速启动模式功能会在关机的时…

在c#控制台中使用Raylib-cs库,绘制控制小球和插入音频(附带c++中小球的控制代码)

下载网址 GitHub - chrisdill/raylib-cs: C# bindings for raylib, a simple and easy-to-use library to learn videogames programming 克隆库 克隆GitHub仓库-CSDN博客 1 .制作dll 点击 生成之后就会多出这些东西 2.在项目中添加dll 然后就导进来了 测试一下用例代码 …

【开源免费】基于Vue和SpringBoot的服装生产管理系统(附论文)

博主说明:本文项目编号 T 066 ,文末自助获取源码 \color{red}{T066,文末自助获取源码} T066,文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析…

【漫话机器学习系列】Adaboost算法

Adaboost(Adaptive Boosting)是一种经典的集成学习方法,主要思想是通过将多个弱学习器(通常是简单模型,如决策树桩)加权组合,来提升整体模型的预测能力。Adaboost 是一种自适应的学习方法&#…

WebStorm快捷键保持跟Idea一致

修改连续行局部多选 在WebStorm中同时按下ctrl alt s; 选择KeyMap 输入Column Selection Mode选择快捷键, 右键选择Add Mouse Shortcut 按下alt 鼠标左键 如果出现占用的情况,直接删除其他使用该快捷键的地方即可; 修改跨行局部多选 在…

图的遍历之DFS邻接矩阵法

本题要求实现一个函数,对给定的用邻接矩阵存储的无向无权图,以及一个顶点的编号v,打印以v为起点的一个深度优先搜索序列。 当搜索路径不唯一时,总是选取编号较小的邻接点。 本题保证输入的数据(顶点数量、起点的编号等…

如何解决java.lang.UnsatisfiedLinkError:org.hyperic.sigar.ProcStat.gather问题

在新装的centos7.4服务器上部署部署应用系统,应用系统系统启动报错:“java.lang.UnsatisfiedLinkError:org.hyperic.sigar.ProcStat.gather” 一、报错分析 java.lang.UnsatisfiedLinkError通常是由于Java程序无法找到、加载或链接到所需的本地库而引发的…