(二十一)大数据实战——Flume数据采集之复制和多路复用案例实战

前言

本节内容我们完成Flume数据采集的一个多路复用案例,使用三台服务器,一台服务器负责采集本地日志数据,通过使用Replicating ChannelSelector选择器,将采集到的数据分发到另外俩台服务器,一台服务器将数据存储到hdfs,另外一台服务器将数据存储在本机,使用Avro的方式完成flume之间采集数据的传输。整体架构如下:

正文

①在hadoop101服务器的/opt/module/apache-flume-1.9.0/job目录下创建job-file-flume-avro.conf配置文件,用于监控hive日志并传输到avro sink

- job-file-flume-avro.conf配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /tmp/hadoop/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

②在hadoop102服务器的/opt/module/apache-flume-1.9.0/job目录下创建job-avro-flume-hdfs.conf配置文件,将监控数据传输到hadoop的hdfs系统

- job-avro-flume-hdfs.conf配置文件

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop101:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

③在hadoop103服务器的/opt/module/apache-flume-1.9.0/job目录下创建job-avro-flume-dir.conf配置文件,将监控数据传输到/opt/module/apache-flume-1.9.0/flume3目录下

- job-avro-flume-dir.conf配置文件

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop103
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/apache-flume-1.9.0/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

- 创建数据存储目录/opt/module/apache-flume-1.9.0/flume3

④启动hadoop集群

 

⑤启动hadoop102上的flume任务job-avro-flume-hdfs.conf

- 命令:

bin/flume-ng agent -c conf/ -n a2 -f job/job-avro-flume-hdfs.conf -Dflume.root.logger=INFO,console

 ⑥启动hadoop103上的flume任务job-avro-flume-dir.conf

- 命令:

bin/flume-ng agent -c conf/ -n a3 -f job/job-avro-flume-dir.conf -Dflume.root.logger=INFO,console

⑦启动hadoop101上的flume任务job-file-flume-avro.conf

- 命令:

bin/flume-ng agent -c conf/ -n a1 -f job/job-file-flume-avro.conf -Dflume.root.logger=INFO,console

⑧启动hive

 ⑨查看监控结果

- 查看hdfs

- 查看存储目录/opt/module/apache-flume-1.9.0/flume3下的文件

结语

至此,关于Flume数据采集之复制和多路复用案例实战到这里就结束了,我们下期见。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/122751.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pytorch-神经网络-手写数字分类任务

Mnist分类任务: 网络基本构建与训练方法,常用函数解析 torch.nn.functional模块 nn.Module模块 读取Mnist数据集 会自动进行下载 %matplotlib inlinefrom pathlib import Path import requestsDATA_PATH Path("data") PATH DATA_PATH / &…

使用 Nacos 在 Spring Boot 项目中实现服务注册与配置管理

🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…

每日刷题(回溯法经典问题之子集)

食用指南:本文为作者刷题中认为有必要记录的题目 前置知识:回溯法经典问题之组合 ♈️今日夜电波:想着你—郭顶 1:09 ━━━━━━️💟──────── 4:15 …

jmeter setUp Thread Group

SetUp Thread Group 是一种特殊类型的线程组,它用于在主测试计划执行之前执行一些初始化任务。 SetUp Thread Group 通常用于以下几种情况: 用户登录:在模拟用户执行实际测试之前,模拟用户登录到系统以获取访问权限。 创建会话&a…

freertos之资源管理

中断屏蔽 屏蔽中断函数 在任务中使用 taskENTER_CRITICA()/taskEXIT_CRITICAL() 在中断中使用 taskENTER_CRITICAL_FROM_ISR()/taskEXIT_CRITICAL_FROM_ISR() 功能介绍 使用上述函数,进入临界中断,任务不会切换,且中断优先级处于con…

Ubuntu入门04——目录与文件

目录 1.显示当前工作目录 2.更改目录 3.创建工作目录 4.删除工作目录 5.移动文件或者文件夹 6.文件夹and文件查看命令 7. 回到根目录,回到上一级 8.删除工作目录 9.查看目录和文件 10.以树状图列出目录内容 11.文件查找 12.在数据库中查找文件或目录 1…

【大数据】Apache Iceberg 概述和源代码的构建

Apache Iceberg 概述和源代码的构建 1.数据湖的解决方案 - Iceberg1.1 Iceberg 是什么1.2 Iceberg 的 Table Format 介绍1.3 Iceberg 的核心思想1.4 Iceberg 的元数据管理1.5 Iceberg 的重要特性1.5.1 丰富的计算引擎1.5.2 灵活的文件组织形式1.5.3 优化数据入湖流程1.5.4 增量…

[移动通讯]【Carrier Aggregation-3】【5G】

前言: 参考: 5G Mobile Communications:《Carrier Aggregation in 5G》 目录: 1: carrier Allocation Schemes 2: 网络结构 3: LTE CA 4: 5G CA 一 Carrier Allocation Schemes CA 主要作用…

数学建模--非整数规划求解的Python实现

目录 1.算法流程简介 2.算法核心代码 3.算法效果展示 1.算法流程简介 #非线性规划模型求解: #我们采用通用的minimize函数来求解 #minimize(f,x,method,bounds,contrains) #f是待求函数 #x是代求的自变量 #method是求解方法 #bounds是取值范围边界 #contrains是约束条件 &q…

Java HashMap

简介 HashMap 是一个散列表,它存储的内容是键值对(key-value)映射。 HashMap 实现了 Map 接口,根据键的 HashCode 值存储数据,具有很快的访问速度,最多允许一条记录的键为 null,不支持线程同步。 HashMap 是无序的&…

新材料行业CRM客户管理系统的选择

新材料行业虽属小众细分市场,但其应用极广。更广阔的市场也意味着更激烈的竞争。新材料行业巨头林立,企业如何能脱颖而出?这里有一个适合新材料行业CRM系统推荐—Zoho CRM,帮助新材料企业推动业务高效运转的新模式。 数字化是企业…

Python爬虫乱码问题之encoding和apparent_encoding的区别

encoding是从http中的header中的charset字段中提取的编码方式,若header中没有charset字段则默认为ISO-8859-1编码模式,则无法解析中文,这是乱码的原因 apparent_encoding会从网页的内容中分析网页编码的方式,所以apparent_encodi…

Ubuntu之apt-get系列--apt-get安装软件的方法/教程

原文网址&#xff1a;Ubuntu之apt-get系列--apt-get安装软件的方法/教程_IT利刃出鞘的博客-CSDN博客 简介 本文介绍Ubuntu使用apt-get安装软件的方法。 安装软件 先更新列表 sudo apt-get update 安装软件 sudo apt-get install <package name>[<version>]…

windows查看端口占用,通过端口找进程号(查找进程号),通过进程号定位应用名(查找应用)(netstat、tasklist)

文章目录 通过端口号查看进程号netstat通过进程号定位应用程序tasklist 通过端口号查看进程号netstat 在Windows系统中&#xff0c;可以使用 netstat 命令来查看端口的占用情况。以下是具体的步骤&#xff1a; 打开命令提示符&#xff08;CMD&#xff09;&#xff1a;按WinR组…

绘制钻头芯厚变化图

import numpy as np import matplotlib.pyplot as plt posnp.array([0.05,0.5,0.97,3]) data_m1np.array([0.088,0.093,0.098,0.116]) data_m2data_m1-0.01 data_m3data_m1-0.02 fig plt.figure(figsize(5, 4)) plt.rcParams[xtick.direction] in # 将x周的刻度线方向设置向…

向量数据库Annoy和Milvus

Annoy 和 Milvus 都是用于向量索引和相似度搜索的开源库&#xff0c;它们可以高效地处理大规模的向量数据。 Annoy&#xff08;Approximate Nearest Neighbors Oh Yeah&#xff09;&#xff1a; Annoy 是一种近似最近邻搜索算法&#xff0c;它通过构建一个树状结构来加速最近…

并行和并发的区别

从操作系统的角度来看&#xff0c;线程是CPU分配的最小单位。 并行就是同一时刻&#xff0c;两个线程都在执行。这就要求有两个CPU去分别执行两个线程。并发就是同一时刻&#xff0c;只有一个执行&#xff0c;但是一个时间段内&#xff0c;两个线程都执行了。并发的实现依赖于…

旅游APP外包开发注意事项

旅游类APP通常具有多种功能&#xff0c;以提供给用户更好的旅行体验。以下分享常见的旅游类APP功能以及在开发和使用这些APP时需要注意的问题&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 常见功能…

【HCIE】01.IGP高级特性

高级特性&#xff1a;一条命令解决一个问题 OSPF快速收敛机制 发生故障重新计算拓扑的过程叫做收敛&#xff0c;设备现在本身就是PRC算法和I-SPF算法 PRC&#xff08;针对叶子节点&#xff0c;叶子代表路由&#xff09; 不需要命令配置&#xff0c;就是ospf的特性&#xff…

LeetCode —— 复写零(双指针)

题目链接 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 题目解析 将数组中出现的每个零复写一遍&#xff0c;然后将其他元素向右平移&#xff0c;数组长度不能改变。 法一&#xff1a;使用额外空间的做法 class Solution { public:void duplica…