关于Flume-Kafka-Flume的模式进行数据采集操作


       测试是否连接成功:

        在主节点flume目录下输入命令:

bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
# 这个file_to_kafka.conf文件就是我们的配置文件

 

        然后在另一台节点输入命令进行消费数据:

 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log

        然后再开一个主节点终端,在这个主节点上面在对应生成数据的文件追加数据

         

        这样就可以看见第一个主节点的终端和消费节点上面有数据变化了! 

 


         下面这个是配置拦截器,把json格式的内容进行消费,其他的进行拦截

        Flume采集数据到kafka的配置conf文件内容:

#定义组件

#1、定义source、channel、agent名称
a1.sources = r1
a1.channels = c1
#配置source

#2、描述source
a1.sources.r1.type = TAILDIR

#指定监控的组名
a1.sources.r1.filegroups = f1

#指定f1组监控的路径
a1.sources.r1.filegroups.f1 = /opt/software/applog/log/app.*

#指定断点续传的文件
a1.sources.r1.positionFile = /opt/software/flume/taildir_position.json
# 配置拦截器
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder
#配置channel

#3、描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

#指定kafka集群
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092

#指定数据写到kafka哪个topic
a1.channels.c1.kafka.topic = topic_log

#是否以Event对象的形式写入kafka
a1.channels.c1.parseAsFlumeEvent = false
#组装 

#4、关联source->channel
a1.sources.r1.channels = c1

         如果一开始测试我们flume和kafka是否能成功采集数据的时候,我们应该先把拦截器的两行配置先删除,后面再根据我们需要的内容进行拦截对应的内容。就比如:我们期望我们采集到数据是json格式的,如果不是json格式的话,我们就放弃这个数据。

     具体操作:

(1)创建Maven工程flume-interceptor

(2)创建包:com.gugu.gmall.flume.interceptor

(3)在pom.xml文件中添加如下配置

        

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

 在com.gugu.gmall.flume.utils包下创建JSONUtil类

package com.gugu.gmall.flume.utils;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;public class JSONUtil {
/*
* 通过异常判断是否是json字符串
* 是:返回true  不是:返回false
* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}
}

在com.gugu.gmall.flume.interceptor包下创建ETLInterceptor类 

package com.gugu.gmall.flume.interceptor;import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取body当中的数据并转成字符串byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回nullif (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()){Event next = iterator.next();if(intercept(next)==null){iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}@Overridepublic void close() {}
}

        然后进行打包,复制到我们flume下的lib目录下就可以了!

        然后再和上面测试一样进行测试连接,是否成功把非json格式的数据拦截成功!


感谢各位的观看,创作不易,能不能给哥们来一个点赞呢!!!

好了,今天的分享就这么多了,有什么不清楚或者我写错的地方,请多多指教!

私信,评论我呗!!!!!!

关注我下一篇不迷路哦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/194082.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

三菱FX3U小项目—机床定时器延时启动

目录 一、项目描述 二、IO口分配 三、项目程序 四、总结 一、项目描述 为了防止工人操作失误&#xff0c;启动按钮需要按住1s后&#xff0c;设备才启动&#xff0c;启动后第一台电机启动10s后第二台电机自动启动&#xff0c;当按下停止按钮时&#xff0c;两台电机同时停止。…

Django部署时静态文件配置的坑

Django部署时静态文件配置配置的坑 近期有个需求是用django进行开发部署&#xff0c;结果发现静态文件配置的坑是真的多&#xff0c;另外网上很多的内容也讲不清楚原理&#xff0c;就是这样这样&#xff0c;又那样那样&#xff0c;进了不少坑&#xff0c;这里记录一下关于css,…

【Apache Doris】审计日志插件 | 快速体验

【Apache Doris】审计日志插件 | 快速体验 一、 环境信息1.1 硬件信息1.2 软件信息 二、 审计日志插件介绍三、 快速 体验3.1 AuditLoader 配置3.1.1 下载 Audit Loader 插件3.1.2 解压安装包3.1.3 修改 plugin.conf 3.2 创建库表3.3 初始化3.4 验证 一、 环境信息 1.1 硬件信…

windiws docker 部署jar window部署docker 转载

Windows环境下从安装docker到部署前后端分离项目(springboot+vue) 一、前期准备 1.1所需工具: 1.2docker desktop 安装 二、部署springboot后端项目 2.1 部署流程 三、部署vue前端项目 3.1相关条件 3.2部署流程 四、前后端网络请求测试 一、前期准备 1.1所需工具: ①docke…

小米手环8pro重新和手机配对解决办法

如果更换了手机&#xff0c;那么小米手环8pro是无法和新手机自动连接的。 但是在新手机上直接连接又连接不上&#xff0c;搜索蓝牙根本找不到手环的蓝牙。 解决办法就是&#xff1a; 把手环恢复出厂&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 是的&…

苹果MAC安装绿盾出现问题,安装时没有出现填服务器地址的页面,现在更改不了也卸载不了绿盾 怎么处理?

环境: Mac mini M1 Mac os 11.0 绿盾v6.5 问题描述: 苹果MAC安装绿盾出现问题,安装时没有出现填服务器地址的页面,现在更改不了也卸载不了绿盾 怎么处理? 解决方案: 大部分企业是Windows和Mac终端混合使用,在进行文档加密管理时通常会遇到不兼容的现象,而为了统一…

Scala---数据基础

一、数据类型 二、变量和常量的声明 定义变量或者常量的时候&#xff0c;也可以写上返回的类型&#xff0c;一般省略&#xff0c;如&#xff1a;val a:Int 10常量不可再赋值 1./** 2. * 定义变量和常量 3. * 变量 :用 var 定义 &#xff0c;可修改 4. * 常量 :用 val 定…

基于IGT-DSER实现工业触摸屏与PLC设备之间WIFI无线通讯

本文是基于IGT-DSER系列智能网关设备实现工业触摸屏与PLC设备之间WIFI无线通讯的案例。PLC之间无线通讯的案例 网络结构如下图&#xff0c;触摸屏通过网线连接IGT-DSERWIFI智能网关&#xff0c;实现WIFI的AP功能&#xff1b;一台串口型PLC和一台网口型PLC分别通过IGT-WSER智能网…

数据同步工具调研选型:SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

产品概述 Apache SeaTunnel 是一个非常易用的超高性能分布式数据集成产品&#xff0c;支持海量数据的离线及实时同步。每天可稳定高效同步万亿级数据&#xff0c;已应用于数百家企业生产&#xff0c;也是首个由国人主导贡献到 Apache 基金会的数据集成顶级项目。 SeaTunnel 主…

2023数维杯国际赛数学建模竞赛选题建议及B题思路讲解

大家好呀&#xff0c;2023年第九届数维杯国际大学生数学建模挑战赛今天早上开赛啦&#xff0c;在这里先带来初步的选题建议及思路。 目前团队正在写B题和D题完整论文&#xff0c;后续还会持续更新哈&#xff0c;大家三连关注一下防止迷路。 注意&#xff0c;本文只是比较简略…

【Linux系统化学习】探索进程的奥秘 | 第一个系统调用

个人主页点击直达&#xff1a;小白不是程序媛 Linux系列专栏&#xff1a;系统化学习Linux 目录 进程的概念 进程的管理 描述进程——pcb 组织进程 进程在排队 Linux下的进程 Linux组织进程 查看进程 查看可执行程序的进程 第一个系统调用 "杀掉进程" 进程…

租赁小程序|租赁系统一种新型的商业模式

租赁市场是一个庞大的市场&#xff0c;它由出租人和承租人组成&#xff0c;以及相关的中介机构和供应商等。随着经济的发展和人们对灵活性的需求增加&#xff0c;租赁市场也在不断发展和壮大。特别是在共享经济时代&#xff0c;租赁市场得到了进一步的推动和发展。租赁系统是一…

2023年9月 少儿编程 中国电子学会图形化编程等级考试Scratch编程一级真题解析(选择题)

2023年9月scratch编程等级考试一级真题 选择题&#xff08;共25题&#xff0c;每题2分&#xff0c;共50分&#xff09; 1、下列哪项内容是不可以修改的 A、角色名称 B、造型名称 C、背景名称 D、舞台名称 答案&#xff1a;D 考点分析&#xff1a;考查scratch相关知识&am…

03-关系和非关系型数据库对比

关系和非关系型数据库对比 关系型数据库(RDBMS)&#xff1a;MySQL、Oracl、DB2、SQLServer 非关系型数据库(NoSql)&#xff1a;Redis、Mongo DB、MemCached 插入数据结构的区别 传统关系型数据库是结构化数据,向表中插入数据时都需要严格的约束信息(如字段名,字段数据类型,字…

【装包拆包----泛型】

文章目录 装箱和拆箱泛型创建一个泛型数组泛型的上界泛型方法 装箱和拆箱 装箱&#xff1a; 把基本数据类型给到引用数据类型 public static void main(String[] args) {//自动装包//第一种装包Integer c 12;//第二种装包int a 7;Integer b a;//显示装包Integer aa Intege…

课程32:.Net Core Web API部署IIS

这里写目录标题 &#x1f680;前言前言一、服务器环境配置1.1 安装 ASP.NET Core模块/托管捆绑包1.2 检查是否安装成功 二、项目发布2.1 选择发布方式2.2 发布配置2.3 发布 三、服务器部署3.1 IIS添加网站3.2 数据库链接配置3.3 让IIS支持.NET Web Api3.4 验证 四、最后 &#…

idea中git 移除对某个文件的跟踪

应用场景如下 某个log 文件&#xff0c;被同事用git 提交到了服务器&#xff0c;本地拉去之后我们的跟踪也会受影响 取消跟踪的方法如下&#xff1a; 删除本地缓存 git rm --cached "logs/test.log" 提交无效的log git commit -m "ignore log" 再将lo…

Postman接口Mock Servier服务器

近期在复习Postman的基础知识&#xff0c;在小破站上跟着百里老师系统复习了一遍&#xff0c;也做了一些笔记&#xff0c;希望可以给大家一点点启发。 应用场景&#xff1a;后端的接口还没有开发完成&#xff0c;前端的业务需要调用后端的接口&#xff0c;可以使用mock模拟。 一…

Linux_虚拟机常用目录汇总

根目录&#xff08;cd /&#xff09;&#xff1a;/ 表示根目录&#xff0c;cd和 / 之间有个空格&#xff01; 用户目录&#xff08;cd ~&#xff09;&#xff1a;~ 表示用户目录&#xff0c;也称为家目录。cd 和 ~ 之间有个空格&#xff01; 当前路径&#xff1a;执行 pwd 指令…

微信加好友操作频繁了,怎么办?

近来&#xff0c;微信的风控是越来越严重&#xff0c;因为本身微信是作为一个社交软件&#xff0c;但流量大适合用来做私域营销。在日常使用微信中&#xff0c;我们也要了解下微信加好友的规则。 目前微信加人的规则是&#xff1a; 1、通过附近人功能加人上限15人/天&#xf…