在处理大规模数据流时,Apache Flume 是一款功能强大的数据聚合工具,它可以通过拦截器在运行时对Event进行修改或丢弃。本文将详细讲解Flume中的拦截器,包括时间戳拦截器、Host添加拦截器、静态拦截器以及如何自定义拦截器。
Flume入门到实践--Flume的安装与基础概念与安装实战-CSDN博客
拦截器
拦截器的作用
拦截器用于在事件流经Flume时,对其进行拦截、处理(比如修改、增强)、或者丢弃。
1. 时间戳拦截器
时间戳拦截器是Flume中非常实用的一个功能,它会自动为每个Event的header添加一个当前的时间戳。这对于后续的数据存储和分析非常有用,尤其是在需要时间序列数据的场景中。
使用场景
当数据需要带有时间戳存储到HDFS,并且HDFS的目录结构中包含时间转义字符时,这个拦截器非常有用。
配置示例
a1.sources.r1.interceptors = i1
i1 是拦截器的名字,自己定义的
a1.sources.r1.interceptors.i1.type = timestamp
指定i1 这个拦截器的类型
hdfs中,如果文件夹使用了时间相关的转义字符,比如
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minutea1.sinks.k1.hdfs.useLocalTimeStamp =true
也可以使用时间戳拦截器
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/c.txta1.sources.r1.channels = c1a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
# round 用于控制含有时间转义符的文件夹的生成规则
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# i1 是拦截器的名字,自己定义的
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
因为必须保证header中有时间戳timestamp这个KV键值对。
假如hdfs中使用了时间转义字符,此时必须指定时间,两种方案
1)使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp =true
2)使用时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
2. Host添加拦截器
Host拦截器可以将数据来源的主机名或IP地址添加到Event的header中。这对于追踪数据来源和进行数据分析时识别数据源非常有用。
配置示例
a1.sources = r1
a1.channels = c1
a1.sinks = s1a1.sources.r1.type= http
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 6666a1.sources.r1.interceptors=i2a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.preserveExisting=false
a1.sources.r1.interceptors.i2.useIP=true
a1.sources.r1.interceptors.i2.hostHeader=hostname
# hostname=192.168.233.128
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.loga1.sinks.s1.hdfs.rollInterval=60a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=10
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=truea1.sources.r1.channels=c1
a1.sinks.s1.channel=c1这个例子可以完美的说明host拦截器是干嘛的,拦截住之后 在Header中添加一个 hostname=192.168.32.128
在hdfs想使用hostname的时候,就可以通过%{hostname}9870 hdfs的web访问端口
9820 是hdfs底层进行通信的端口jps -ml 查看java进行,带详细信息
测试
curl 起始就是通过命令模拟浏览器
curl http://www.baidu.com
curl http://bigdata01:9870/dfshealth.html#tab-overview
curl -X POST -d '[{"headers":{"state":"USER"},"body":"this my multiplex to c1"}]' http://bigdata01:6666
3. 静态拦截器
静态拦截器允许我们在Event的header中添加自定义的静态键值对。这在需要为数据添加额外信息时非常有用,比如添加特定的标签或分类信息。
配置示例
a1.sources.r1.interceptors = i3
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = name
a1.sources.r1.interceptors.i3.value = zhangsan
自定义拦截器
Flume支持自定义拦截器,这为处理特定的数据格式或进行复杂的数据转换提供了可能。通过编写Java代码实现自定义的拦截器,我们可以对接收到的数据进行任意形式的处理。
实现步骤
- 创建Maven项目:导入必要的Flume和JSON处理库(如Jackson或Fastjson)。
- 实现Interceptor接口:编写拦截器逻辑。
- 打包:将编写的拦截器打包成JAR文件,并放入Flume的lib目录下。
- 在Flume配置文件中配置自定义拦截器。
需求
处理数据样例:
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[{"item_type":"eat","active_time":156234},{"item_type":"car","active_time":156233}]
}'结果样例:
[{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"},
{"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}]
创建一个maven项目
导入jar包
xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.bigdata</groupId><artifactId>MyInterceptor</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.48</version></dependency></dependencies><!--可以使用maven中的某些打包插件,不仅可以帮助我们打包代码还可以打包所依赖的jar包--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- 禁止生成 dependency-reduced-pom.xml--><createDependencyReducedPom>false</createDependencyReducedPom></configuration><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><relocations><relocation><!-- 解决包冲突 进行转换--><pattern>com.google.protobuf</pattern><shadedPattern>shaded.com.google.protobuf</shadedPattern></relocation></relocations><artifactSet><excludes><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude></excludes></filter></filters><transformers><!-- 某些jar包含具有相同文件名的其他资源(例如属性文件)。 为避免覆盖,您可以选择通过将它们的内容附加到一个文件中来合并它们--><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>mainclass</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>
打包插件:
builder --> plugins --> plugin
插件
<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- 禁止生成 dependency-reduced-pom.xml--><createDependencyReducedPom>false</createDependencyReducedPom></configuration><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><relocations><relocation><!-- 解决包冲突 进行转换--><pattern>com.google.protobuf</pattern><shadedPattern>shaded.com.google.protobuf</shadedPattern></relocation></relocations><artifactSet><excludes><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude></excludes></filter></filters><transformers><!-- 某些jar包含具有相同文件名的其他资源(例如属性文件)。 为避免覆盖,您可以选择通过将它们的内容附加到一个文件中来合并它们--><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>mainclass</mainClass></transformer></transformers></configuration></execution></executions></plugin>
示例代码
先测试一下:
json --> java 代码解析 json --> 实体 ,实体-->json 字符串 都需要使用工具
jackson、fastjson(阿里巴巴)
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class TestJson {public static void main(String[] args) {String log="{\n" +" 'host':'www.baidu.com',\n" +" 'user_id':'13755569427',\n" +" 'items':[\n" +" {\n" +" 'item_type':'eat',\n" +" 'active_time':156234\n" +" },\n" +" {\n" +" 'item_type':'car',\n" +" 'active_time':156233\n" +" }\n" +" ]\n" +"}";JSONObject jsonObject = JSON.parseObject(log);String host = jsonObject.getString("host");String user_id = jsonObject.getString("user_id");System.out.println(host);System.out.println(user_id);JSONArray items = jsonObject.getJSONArray("items");List list =new ArrayList<Map<String,String>>();for (Object item : items) {// {"active_time":156234,"item_type":"eat"}Map map = new HashMap<String,String>();String itemStr = item.toString();JSONObject jsonItem = JSON.parseObject(itemStr);String active_time = jsonItem.getString("active_time");String item_type = jsonItem.getString("item_type");System.out.println(active_time);System.out.println(item_type);map.put("active_time",active_time);map.put("user_id",user_id);map.put("item_type",item_type);map.put("host",host);list.add(map);}/*** 需要转化为:* * * [{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"},* * * {"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}]*/String jsonString = JSON.toJSONString(list);System.out.println(jsonString);}
}
package com.bigdata;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;public class DemoInterceptor implements Interceptor {@Overridepublic void initialize() {}// 只需要关注 这个方法的写法/*** 需求:* log='{* "host":"www.baidu.com",* "user_id":"13755569427",* "items":[* {* "item_type":"eat",* "active_time":156234* },* {* "item_type":"car",* "active_time":156233* }* ]* }'** 需要转化为:* [{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"},* {"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}]*/@Overridepublic Event intercept(Event event) {// 解析 文本数据变为另一种格式byte[] body = event.getBody();String content = new String(body);/*** {* "host":"www.baidu.com",* "user_id":"13755569427",* "items":[* {* "item_type":"eat",* "active_time":156234* },* {* "item_type":"car",* "active_time":156233* }* ]* }*/// 将一个json字符串变为 json 对象JSONObject jsonObject = JSON.parseObject(content);// 通过对象 获取 json 中的值String host = jsonObject.getString("host");String user_id = jsonObject.getString("user_id");// 通过对象获取json 数组JSONArray items = jsonObject.getJSONArray("items");// 定义一个集合,集合中是mapArrayList<HashMap<String, String>> list = new ArrayList<>();for (Object object: items) {String obj = object.toString();JSONObject jobj = JSON.parseObject(obj);String item_type = jobj.getString("item_type");String active_time = jobj.getString("active_time");HashMap<String, String> map = new HashMap<>();map.put("active_time",active_time);map.put("item_type",item_type);map.put("host",host);map.put("user_id",user_id);list.add(map);}// 将对象变为字符串String s = JSON.toJSONString(list);event.setBody(s.getBytes());return event;}// 这个方法可以调取 上面这个方法@Overridepublic List<Event> intercept(List<Event> list) {for (int i=0;i<list.size();i++) {Event oldEvent = list.get(i);Event newEvent = intercept(oldEvent);list.set(i,newEvent);}return list;}@Overridepublic void close() {}// 作用只有一个,就是new 一个自定义拦截器的类public static class BuilderEvent implements Builder{@Overridepublic Interceptor build() {return new DemoInterceptor();}@Overridepublic void configure(Context context) {}}
}
打包,上传至 flume 下的lib 下。
测试
编写一个flume脚本文件
testInter.conf
a1.sources = s1
a1.channels = c1
a1.sinks = r1a1.sources.s1.type = TAILDIR#以空格分隔的文件组列表。每个文件组表示要跟踪的一组文件
a1.sources.s1.filegroups = f1
#文件组的绝对路径
a1.sources.s1.filegroups.f1=/home/b.log#使用自定义拦截器
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = com.bigdata.DemoInterceptor$BuilderEventa1.channels.c1.type = file
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.r1.type = hdfs
a1.sinks.r1.hdfs.path = /flume/202409
a1.sinks.r1.hdfs.fileSuffix= .log# 将上传的数据格式使用text类型,便于查看
a1.sinks.r1.hdfs.fileType=DataStream
a1.sinks.r1.hdfs.writeFormat=Texta1.sources.s1.channels = c1
a1.sinks.r1.channel = c1
运行该脚本
flume-ng agent -c ./ -f testInterceptor.conf -n a1 -Dflume.root.logger=INFO,console
出现错误
此时,说明我们打包的时候没有将这个jar包打包到自定义的jar包,可以通过手动的提交的方式解决这个问题。
将fast-json.jar 放入到 flume/lib
通过网盘分享的文件:fastjson-1.2.48.jar
假如你使用了打包插件,已经将这个 fast-json 打入了你的 jar 包中,无需该操作。
{"host":"www.baidu.com","user_id":"13755569427","items":[{"item_type":"eat","active_time":156234},{"item_type":"car","active_time":156233}]}
接着开始进行测试,必须先启动flume
编写一个脚本,模拟 b.log 中不断的产生json数据的场景。
#!/bin/bash
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[
{
"item_type":"eat",
"active_time":156234
},
{
"item_type":"car",
"active_time":156233
}
]
}'
echo $log >> /home/b.log
保存,并且赋予权限:
chmod 777 createJson.sh执行这个脚本,就可以模拟不断的向 b.log中传输数据了
./createJson.sh
视频链接
10-时间戳拦截器_哔哩哔哩_bilibili
11-host拦截器_哔哩哔哩_bilibili
12-静态拦截器_哔哩哔哩_bilibili
13-自定义拦截器一_哔哩哔哩_bilibili
14-自定义拦截器二_哔哩哔哩_bilibili