0301taildir-source报错-flume-大数据

1 基础环境简介

linux系统:centos,前置安装:jdk、hadoop、zookeeper、kafka,版本如下

软件版本描述
centos7linux系统发行版
jdk1.8java开发工具集
hadoop2.10.0大数据生态基础组件
zookeeper3.5.7分布式应用程序协调服务
kafka3.0分布式mq组件
flume1.9.0分布式采集传输组件

2 报错

  • 场景1:动态监控目录多个日志变化,通过flume采集传输到kafka

  • 报错日志

    org.apache.flume.FlumeException: Error creating positionFile parent directoriesat org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:170)at org.apache.flume.conf.Configurables.configure(Configurables.java:41)at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:325)at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105)at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:750)
    Caused by: java.nio.file.FileAlreadyExistsException: /export/server/flumeat sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)at java.nio.file.Files.createDirectory(Files.java:674)at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)at java.nio.file.Files.createDirectories(Files.java:727)at org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:168)... 11 more
  • conf文件如下

    #定义组件
    a1.sources = r1
    a1.channels = c1#配置source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
    a1.sources.r1.positionFile = /export/server/flume/taildir_position.json#配置channel
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = node1:9092,node2:9092
    a1.channels.c1.kafka.topic = topic_log01
    a1.channels.c1.parseAsFlumeEvent = false#组装 
    a1.sources.r1.channels = c1
    
  • 原因就是在创建positionFile的时候父目录已存在

  • 场景2:我们生成的日志文件app.log 每经过一天会按照日期重命名文件,然后生成新的app.log,此时flume会重新采集所有的日志信息,导致信息重复采集2次。

  • Taildir 说明: Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下:

    {"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}{"inode":2496275,"pos":12,"file":"/opt/module/flume/files2/log.txt"}
    
  • 而flume会同时判断Inode和file来确定是否同一文件

    注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统

    用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来

    识别文件。

3 解决

场景1解决方案有两种:

  1. 既然是创建父目录已存在,我们可以吧positionFile位置重新配置。

  2. 修改源代码,我们通过源代码找下处理逻辑,下载1.9.0版本的flume源代码,官网地址:https://archive.apache.org/dist/flume/,找到TailSource 170行

     @Overridepublic synchronized void configure(Context context) {String fileGroups = context.getString(FILE_GROUPS);Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS);filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX),fileGroups.split("\\s+"));Preconditions.checkState(!filePaths.isEmpty(),"Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'");String homePath = System.getProperty("user.home").replace('\\', '/');positionFilePath = context.getString(POSITION_FILE, homePath + DEFAULT_POSITION_FILE);Path positionFile = Paths.get(positionFilePath);try {// 此处创建父目录,如果存在报错Files.createDirectories(positionFile.getParent());} catch (IOException e) {throw new FlumeException("Error creating positionFile parent directories", e);}headerTable = getTable(context, HEADERS_PREFIX);batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);skipToEnd = context.getBoolean(SKIP_TO_END, DEFAULT_SKIP_TO_END);byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER);idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT);writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL);cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING,DEFAULT_CACHE_PATTERN_MATCHING);backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);fileHeader = context.getBoolean(FILENAME_HEADER,DEFAULT_FILE_HEADER);fileHeaderKey = context.getString(FILENAME_HEADER_KEY,DEFAULT_FILENAME_HEADER_KEY);maxBatchCount = context.getLong(MAX_BATCH_COUNT, DEFAULT_MAX_BATCH_COUNT);if (maxBatchCount <= 0) {maxBatchCount = DEFAULT_MAX_BATCH_COUNT;logger.warn("Invalid maxBatchCount specified, initializing source "+ "default maxBatchCount of {}", maxBatchCount);}if (sourceCounter == null) {sourceCounter = new SourceCounter(getName());}}
    

    在这里插入图片描述

可以在创建父目录之前检测是否已存在,如果已存在,直接跳过创建即可,修改try代码块中内容如下

boolean exists = Files.exists(positionFile.getParent());if (!exists)Files.createDirectories(positionFile.getParent());

maven打包替换flume/lib/下 flume-taildir-source-1.9.0.jar 如图所示:在这里插入图片描述

重新运行,正常启动,如下图日志所示:在这里插入图片描述

kafka中新接收的数据如下图所示:在这里插入图片描述

场景2解决方案 把TailFile如下代码

  public boolean updatePos(String path, long inode, long pos) throws IOException {if (this.inode == inode && this.path.equals(path)) {setPos(pos);updateFilePos(pos);logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);return true;}return false;}// 修改为public boolean updatePos(String path, long inode, long pos) throws IOException {if (this.inode == inode) {setPos(pos);updateFilePos(pos);logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);return true;}return false;}

即不校验file只校验inode,具体这里不再去验证,有兴趣自己验证下

结语

如果小伙伴什么问题或者指教,欢迎交流。

❓QQ:806797785

参考链接:

[1]flume教学视频[CP/OL].2020-04-16.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/277672.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【PyTorch】成功解决ModuleNotFoundError: No module named ‘torch’

【PyTorch】成功解决ModuleNotFoundError: No module named ‘torch’ &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希…

源码编译部署LAMP

编译部署LAMP 配置apache [rootzyq ~]#: wget https://downloads.apache.org/apr/apr-1.7.4.tar.gz --2023-12-11 14:35:57-- https://downloads.apache.org/apr/apr-1.7.4.tar.gz Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104…

基于Springboot和Redis实现的在线选课系统

1.项目简介 1.1 介绍 毕业设计真的就是demo吗&#xff1f;作为工作前的最后一个校园项目&#xff0c;毕业设计应当尽可能的贴近企业实战&#xff0c;业务不必很复杂&#xff0c;但要做到麻雀虽小五脏俱全。本期学长跟大家一起分享如何开发一个在线选课系统&#xff0c;需求也…

从汇编来角度剖析C语言函数调用过程

C基础专栏&#xff1a;http://t.csdnimg.cn/WcEhj 目录 1.引言 2.寄存器 3.栈帧 4.函数调用前调用者的动作 5.被调用者在函数调用后的动作 6.被调用者返回前的动作 7.调用者在返回后的动作 8.总结 1.引言 当一个c函数被调用时&#xff0c;一个栈帧(stack frame)是如何被…

迁移学习怎么用

如果想实现一个计算机视觉应用&#xff0c;而不想从零开始训练权重&#xff0c;比方从随机初始化开始训练&#xff0c;更快的方式是下载已经训练好权重的网络结构&#xff0c;把这个作为预训练&#xff0c;迁移到你感兴趣的新任务上。ImageNet、PASCAL等等数据库已经公开在线。…

8:00面试,8:06就出来了,问的问题有点变态。。。

从小厂出来&#xff0c;没想到在另一家公司又寄了。 到这家公司开始上班&#xff0c;加班是每天必不可少的&#xff0c;看在钱给的比较多的份上&#xff0c;就不太计较了。没想到9月一纸通知&#xff0c;所有人不准加班&#xff0c;加班费不仅没有了&#xff0c;薪资还要降40%…

Springboot项目部署

1、Sping路径不需要有项目名&#xff0c;因为Springboot内置了tomcat&#xff0c;一个tomcat下面就部署了当前这一个项目&#xff0c;如果想要部署多个项目就要启动多个tomcat &#xff08;1&#xff09;一个项目多个端口 填写想要开的端口号 &#xff08;2&#xff09;部署多…

字符分类函数(iscntrl、i是space.....)---c语言

目录 一、定义二、字符分类函数2.1 -iscntrl&#xff08;&#xff09;2.1.1定义2.1.2使用举例 2.2 -isspace&#xff08;&#xff09;2.2.1描述2.2.2使用举例 2.3-isdigit()2.3.1描述2.3.2使用举例 2.4-isxdigit()2.4.1描述 2.5-islower()2.5.1描述2.5.2使用举例 2.6-isupper()…

Java基础夯实【进阶】——八股文【2024面试题案例代码】

1、Java当中什么是线程和进程 在Java中&#xff0c;线程和进程是两个非常重要的概念。进程可以被视为一个执行中的程序的实例&#xff0c;它拥有自己的内存空间和系统资源。而线程则是进程中的一个实体&#xff0c;由进程创建&#xff0c;并允许程序在同一时刻执行多个任务。J…

决策树 | 分类树回归树:算法逻辑

目录 一. 决策树(Decision Tree)1. 决策树的构建1.1 信息熵(Entropy)1.1.1 信息量&信息熵 定义1.1.2 高信息熵&低信息熵 定义1.1.3 信息熵 公式 1.2 信息增益(Information Gain)1.2.1 信息增益的计算1.2.2 小节 2. 小节2.1 算法分类2.2 决策树算法分割选择2.3 决策树算…

MechanicalSoup,一个非常实用的 Python 自动化浏览器交互工具库!

目录 前言 什么是 Python MechanicalSoup 库&#xff1f; 核心功能 使用方法 1. 安装 MechanicalSoup 库 2. 创建 MechanicalSoup 客户端 3. 打开网页并与之交互 实际应用场景 1. 网页自动化测试 2. 网络爬虫与数据提取 3. 网页自动化操作 4. 自动化填写和提交多个表单 5.…

柚见十三期(优化)

前端优化 加载匹配功能与加载骨架特效 骨架屏 : vant-skeleton index.vue中 /** * 加载数据 */ const loadData async () > { let userListData; loading.value true; //心动模式 if (isMatchMode.value){ const num 10;//推荐人数 userListData await myA…

django-comment-migrate 模型注释的使用

django-comment-migrate 的使用 django-comment-migrate 是一个 Django 应用&#xff0c;用于将模型注释自动迁移到数据库表注释中。它可以帮助您保持数据库表注释与模型定义的一致性&#xff0c;并提高代码的可读性。 安装 要使用 django-comment-migrate&#xff0c;您需要…

线程是如何在 6 种状态之间转换的

线程是如何在 6 种状态之间转换的 线程的 6 种状态New 新创建Runnable 可运行阻塞状态Blocked 被阻塞Waiting 等待Timed Waiting 限期等待 注意点 主要学习线程是如何在 6 种状态之间转换。 线程的 6 种状态 就像生物从出生到长大、最终死亡的过程一样&#xff0c;线程也有自己…

搭建Hadoop3.x完全分布式集群

零、资源准备 虚拟机相关&#xff1a; VMware workstation 16&#xff1a;虚拟机 > vmware_177981.zipCentOS Stream 9&#xff1a;虚拟机 > CentOS-Stream-9-latest-x86_64-dvd1.iso Hadoop相关 jdk1.8&#xff1a;JDK > jdk-8u261-linux-x64.tar.gzHadoop 3.3.6&am…

Netty架构详解

文章目录 概述整体结构Netty的核心组件逻辑架构BootStrap & ServerBootStrapChannelPipelineFuture、回调和 ChannelHandler选择器、事件和 EventLoopChannelHandler的各种ChannelInitializer类图 Protocol Support 协议支持层Transport Service 传输服务层Core 核心层模块…

第七节:Vben Admin权限-后端获取路由和菜单

系列文章目录 第一节:Vben Admin介绍和初次运行 第二节:Vben Admin 登录逻辑梳理和对接后端准备 第三节:Vben Admin登录对接后端login接口 第四节:Vben Admin登录对接后端getUserInfo接口 第五节:Vben Admin权限-前端控制方式 第六节:Vben Admin权限-后端控制方式 第七节…

Unity2019.2.x 导出apk 安装到安卓Android12+及以上的系统版本 安装出现-108 安装包似乎无效的解决办法

Unity2019.2.x 导出apk 安装到安卓Android12及以上的系统版本 安装出现-108 安装包似乎无效的解决办法 导出AndroidStudio工程后 需要设置 build.gradle文件 // GENERATED BY UNITY. REMOVE THIS COMMENT TO PREVENT OVERWRITING WHEN EXPORTING AGAINbuildscript {repositor…

河马优化算法(HO)-2024年Nature子刊新算法 公式原理详解与性能测评 Matlab代码免费获取

声明&#xff1a;文章是从本人公众号中复制而来&#xff0c;因此&#xff0c;想最新最快了解各类智能优化算法及其改进的朋友&#xff0c;可关注我的公众号&#xff1a;强盛机器学习&#xff0c;不定期会有很多免费代码分享~ 目录 原理简介 一、种群初始化 二、河马在河流或…

【Python编程基础】第一节.Python基本语法(上)

文章目录 前言⼀、Python介绍二、Python环境配置三、Pycharm 书写代码四、Python基本语法 4.1 print 函数的简单使用 4.2 注释 4.3 Python 代码中三种波浪线和 PEP8 4.4 在 cmd 终端中运⾏ Python 代码 4.5 变量 4.6 数据类型 4.7 类型转换…