2023_Spark_实验二十九:Flume配置KafkaSink

实验目的:掌握Flume采集数据发送到Kafka的方法

实验方法:通过配置Flume的KafkaSink采集数据到Kafka中

实验步骤:

一、明确日志采集方式

一般Flume采集日志source有两种方式:

1.Exec类型的Source

可以将命令产生的输出作为源,如:

a1.sources.r1.type = exec

a1.sources.r1.command = ping 10.3.1.227 //此处输入命令

2.Spooling Directory类型的 Source

将指定的文件加入到“自动搜集 ”目录中。flume会持续监听这个目录,把文件当做source来处理。注意:一旦文件被放到“自动收集”目录中后,便不能修改,如果修改,flume会报错。此外,也不能有重名的文件,如果有,flume也会报错。

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /home/work/data

向指定的文件目录下传送一个日志文件,发现flume的控制台打印相关的信息;此外,待收集的文件,会追加一个后缀:completed,表示已处理完。

3.确定采集策略:

采用exec方式采集数据

如果采用spooldir的方式来监控log文件夹,flume会采集log数据,flume会不断修改文件名,导致重复。

所以使用exec命令行的方式,通过tail -F *.log命令比较好!

注意: -F根据文件名进行追踪,并保持重试,即该文件被删除或改名后,如果再次创建相同的文件名,会继续追踪。 而-f根据文件的nodeid即文件描述符进行追踪,当文件改名或被删除,追踪停止 。

二、配置KafkaSink

Flume版本多,网上教程多,版本之间不兼容,推荐大家以Flume官网为准。

Exec Source

Kafka Sink

三、配置Flume配置文件

1. 拷贝一份配置文件模板

cp flume-conf.properties.template kafka.conf

2. 编辑kafka.conf

kafka.conf编辑内容如下

# 定义a2配置文件中每个组件的名称
a2.sources = execSrc
a2.channels = memoryChannel
a2.sinks = loggerSink# 配置source组件
# For each one of the sources, the type is defined
a2.sources.execSrc.type = exec
a2.sources.execSrc.command = tail -F /home/hadoop/scripts/realtime/realdata.log# 配置sink组件
# Each sink's type must be defined
a2.sinks.loggerSink.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.loggerSink.kafka.topic = RealDataTopic
a2.sinks.loggerSink.kafka.bootstrap.servers = hd1:9092
a2.sinks.loggerSink.kafka.flumeBatchSize = 20
a2.sinks.loggerSink.kafka.producer.acks = 1
a2.sinks.loggerSink.kafka.producer.linger.ms = 1
a2.sinks.loggerSink.kafka.producer.compression.type = snappy# 配置缓存方式
# Each channel's type is defined.
a2.channels.memoryChannel.type = memory
a2.channels.memoryChannel.capacity = 1000
a2.channels.memoryChannel.transactionCapacity = 100# 配置source channel sink之间的连接关系
# The channel can be defined as follows.
a2.sources.execSrc.channels = memoryChannel
a2.sinks.loggerSink.channels = memoryChannel

3. 启动测试

/opt/module/apache-flume-1.9.0-bin/bin/flume-ng agent -c conf -f /opt/module/apache-flume-1.9.0-bin/conf/kafka.conf -n a2 -Dflume.root.logger=INFO,console

实验结果:配置kafkaSink成功,配置source为exec读取shell脚本模拟产生的实时数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/220464.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基础算法(5):滑动窗口

1.何为滑动窗口? 滑动窗口其实也是一种算法,主要有两类:一类是固定窗口,一类是可变窗口。固定的窗口只需要一个变量记录,而可变窗口需要两个变量。 2.固定窗口 就像上面这个图一样。两个相邻的长度为4的红色窗口&…

Axure中动态面板使用及轮播图多种登录方式左侧导航栏之案列

🎬 艳艳耶✌️:个人主页 🔥 个人专栏 :《产品经理如何画泳道图&流程图》 ⛺️ 越努力 ,越幸运 目录 一、轮播图简介 1、什么是轮播图 2、轮播图有什么作用 3、轮播图有什么特点 4、轮播图适应范围 5、…

智能 GPT 图书馆又重生了

智能 GPT 图书馆又重生了 作者:程序员小白条 1)概述 自从大二寒假准备开始筹备这个项目,到现在已经一年了,这个项目能维护一年,不愧是我.jpg。本来这个项目只是想练练手,因为那时候刚学完 Spring Boot2 V…

Elasticsearch安装部署

Elasticsearch安装部署 1.下载elasticsearch安装包:Elasticsearch 2.4.6 | Elastic 下载中文分词器:Release v1.10.6 medcl/elasticsearch-analysis-ik GitHub 2.安装elasticsearch rpm -ivh elasticsearch-2.4.6.rpm 3.安装中文分词器插件 首先在…

20.链路聚合二层

链路聚合二层 大意:拉多根线,捆绑到一块,让交换机认为是一个逻辑的接口 增加带宽的同时,还能起到链路冗余的作用 链路聚合分为二层链路聚合与三层链路聚合 二层链路聚合 不需要配置IP地址 两台PC配置好IP后默认情况下是可以相互…

sqlserver dba日常操作

文章目录 查询慢sql的方法sqlserver备份全备差异备日志备ldf备份事务备份 注意事项SQL Server 还原全备还原差异备份还原日志备/尾日志还原事务日志还原备份还原中的问题还原失败,需要某些权限重命名sql Server数据库名称失败 作业迁移单个迁移批量迁移 登陆账号迁移…

软件开发模型(架构师复习资料)

在计算机刚刚诞生的年代,计算机是一种只有天才才能掌握的工具。人们对软件的认知仅仅停留在程序的层面上,所谓的软件开发就是那些能够掌握计算机的天才们写的一些只有计算机才能理解的二进制序列。但随着技术的发展,软件的复杂度不断提高&…

安装android studio

记录一下安装android studio的过程: 1.首先安装android studio到某一文件夹后,在C盘用户目录下可以看到.android文件夹。C:\Users\22515\AppData\Local\Google目录下也会出现AndroidStudio2022.2文件夹。(注意:用户名&#xff0c…

算法通关第十九关-青铜挑战理解动态规划

大家好我是苏麟 , 今天聊聊动态规划 . 动态规划是最热门、最重要的算法思想之一,在面试中大量出现,而且题目整体都偏难一些对于大部人来说,最大的问题是不知道动态规划到底是怎么回事。很多人看教程等,都被里面的状态子问题、状态…

maui中实现加载更多 RefreshView跟ListView(2)

一个类似商品例表的下拉效果&#xff1a; 代码 新增个类为商品商体类 public class ProductItem{public string ImageSource { get; set; }public string ProductName { get; set; }public string Price { get; set; }}界面代码&#xff1a; <?xml version"1.0&quo…

Missing artifact org.wltea.analyzer:ik-analyzer:jar:5.0

没有找到【org.wltea.analyzer】 找到了【org.wltea.ik-analyzer】 https://github.com/wks/ik-analyzer https://github.com/wks/ik-analyzer.git https://code.google.com/archive/p/ik-analyzer/downloads?page2 C:\Users\Administrator\Desktop\ik-analyzer-master>m…

用bash写脚本

本章主要介绍如何使用bash写脚本。 了解通配符 了解变量 了解返回值和数值运算 数值的对比 判断语句 循环语句 grep的用法是“grep 关键字 file”&#xff0c;意思是从file中过滤出含有关键字的行。 例如&#xff0c;grep root /var/log/messages&#xff0c;意思是从/var/log/…

IDEA添加Apifox插件后,返回参数不详细解决办法

Apifox官方文档地址(文档中返回的是特殊情况&#xff0c;跟我现在项目的返回不一样&#xff0c;因此需要更改配置) 点击跳转到官方API地址 实现步骤分为两步&#xff1a;第一步&#xff1a;添加配置&#xff0c;第二步使用注解。 1.添加配置 打开Idea设置&#xff0c;添加配置…

带你学C语言~指针(2)

目录 &#x1f3c9;前言 &#x1f680; 数组名的理解 &#x1f680;使用指针访问数组 ✈一维数组传参的本质 ✈冒泡排序 &#x1f3c6;二级指针 &#x1f3c6;指针数组 &#x1f3c6;指针数组模拟二维数组 &#x1f389;结束语 &#x1f3c9;前言 上一章&#xff0c;小…

element组件库的日期选择器如何限制?

本次项目中涉及到根据日期查找出来的数据进行调整,所以修改的数据必须是查找范围内的数据.需要对调整数据的日期进行限制,效果如下: 首先我们使用了element 组件库的日期选择器,其中灌完介绍, picker-options中函数disabledDate可以设置禁用状态,代码如下: <el-date-pickerv…

【SpringBoot】参数校验及异常处理

实现注册功能时经常遇到参数校验的问题。 参数校验 引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId> </dependency>参数前添加注解&#xff0c;并指定校验规…

【数据分析之Numpy】Numpy中复制函数numpy.repeat()与numpy.tile()的使用方法及区别

一、简介 numpy.repeat()与numpy.tile()都是Numpy库中的复制函数&#xff0c;用于将数组中的元素重复指定的次数。 numpy.repeat()函数接受三个参数&#xff1a;要重复的数组、重复的次数和重复的轴。 numpy.tile()函数接受两个参数&#xff1a;要重复的数组和重复的次数。 二…

Github 2023-12-18 开源项目周报 Top14

根据Github Trendings的统计&#xff0c;本周(2023-12-18统计)共有14个项目上榜。根据开发语言中项目的数量&#xff0c;汇总情况如下&#xff1a; 开发语言项目数量TypeScript项目4Python项目4Jupyter Notebook项目3非开发语言项目1JavaScript项目1Rust项目1Go项目1 基于项目…

自然语言处理阅读第二弹

HuggingFace 镜像网站模型库 NLP中的自回归模型和自编码模型 自回归&#xff1a;根据上文内容预测下一个可能的单词&#xff0c;或者根据下文预测上一个可能的单词。只能利用上文或者下文的信息&#xff0c;不能同时利用上文和下文的信息。自编码&#xff1a;对输入的句子随…

MyBatis的查询方法!!!

准备工作&#xff1a;1.创建一个maven工程&#xff0c;然后将pojo类导入到项目中去。 2.导入依赖到pom.xml文件中 3.在resources中创建log4j.properites和mybatis-config.xml 4.创建UserMapper接口和UserMapper.xml文件 5.创建测试类MyBatisTest 1.创建Maven工程&#xff0c;还…