Flume 与 Kafka 整合实战

目录

一、Kafka 作为 Source【数据进入到kafka中,抽取出来】

(一)环境准备与配置文件创建

(二)创建主题

(三)测试步骤

二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】

(一)编写配置脚本

(二)创建 topic

(三)测试过程

三、应用场景示例 

四、总结


        在大数据处理的生态系统中,Flume 和 Kafka 都是非常重要的组件。Flume 擅长收集、聚合和传输大量的日志数据等,而 Kafka 则是一个高性能的分布式消息队列,能够处理海量的实时数据。将 Flume 和 Kafka 进行整合,可以构建强大的数据处理管道,实现数据的高效采集、传输和处理。本文将详细介绍 Flume 和 Kafka 整合的两种常见方式:Kafka 作为 Source 和 Kafka 作为 Sink。

一、Kafka 作为 Source【数据进入到kafka中,抽取出来】

 

(一)环境准备与配置文件创建

        在 Flume 的 conf 文件夹下,创建一个名为 kafka - memory - logger.conf 的脚本文件。这里需要注意,在实际操作中可能会遇到错误,例如 kafka 的每一批次的读取数量大于了 channel 的容量。这种情况下的解决方案是要么降低 kafka 的每一批次读取的容量,要么提高 channel 的容量。

https://flume.liyifeng.org/#kafka-source

kafka-memory-logger.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = five
a1.sources.r1.kafka.consumer.group.id = qiaodaohu# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 128

 

(二)创建主题

        接着创建一个 topic,名字可以叫做 kafka - flume,当然也可以直接使用以前创建好的主题。

kafka-topics.sh --create --topic kafka-flume --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1

 

(三)测试步骤

首先启动一个消息生产者,向 topic 中发送消息。

kafka-console-producer.sh --topic kafka-flume --bootstrap-server bigdata01:9092

然后启动 Flume,接收消息并查看 log 日志,这样就可以验证数据是否能够从 Kafka 成功抽取到 Flume 中并进行后续处理。

在flume的flumeconf 文件夹下

flume-ng agent -n a1 -c ../conf -f ./kafka-memory-logger.conf -Dflume.root.logger=INFO,console

二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】

 

 

(一)编写配置脚本

编写一个名为 flume - kafka - sink.conf 的脚本,内容如下:

##a1就是flume agent的名称
## source r1
## channel c1
## sink k1
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 44444# 修改sink为kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092
a1.sinks.k1.kafka.topic = five
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里的流程是 netcat(模拟数据源)→ memory(内存通道)→ kafka。

 

(二)创建 topic

使用以下命令创建 topic(flume - kafka):

kafka-topics.sh --create --topic flume-kafka --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1

 

(三)测试过程

启动 Flume:

flume-ng agent -n a1 -c conf -f $FLUME_HOME/job/flume-kafka-sink.conf -Dflume.root.logger=INFO,console

使用 telnet 命令,向端口发送消息:

yum -y install telnettelnet bigdata01 44444

在窗口不断地发送文本数据,数据就会被抽取到 Kafka 中。


使用消费者获取 Kafka 数据:

kafka-console-consumer.sh --topic flume-kafka --bootstrap-server bigdata01:9092 --from-beginning

 

三、应用场景示例 

        假定有这样一个场景:Flume 可以抽取不断产生的日志,抽取到的日志数据,发送给 Kafka,Kafka 经过处理,可以展示在页面上,或者进行汇总统计。这样就实现了一定的实时效果,在实际的大数据处理流程中,这种整合方式能够有效地处理海量的实时数据,提高数据处理的效率和可靠性。

四、总结

        通过 Flume 和 Kafka 的整合,我们能够构建更加灵活、高效的数据处理架构,满足不同场景下的大数据处理需求,为后续的数据挖掘、分析等提供坚实的数据基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/481147.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从零开始理解JVM:对象的生命周期之对象销毁(垃圾回收)

一、JVM参数 在学垃圾回收器之前,我们先要知道,jvm参数是怎么回事。因为配置各种回收器,必须对应各种参数设置。 标准参数(-) 所有的JVM实现都必须实现这些参数的功能,而且向后兼容 -help-version 非标准参…

win10中使用ffmpeg的filter滤镜

1 给视频加文字水印 1.1 添加播放时间 ffmpeg -i input.mp4 -vf "drawtextfontfileC\\:/Windows/fonts/consola.ttf:fontsize30:fontcolorwhite:timecode00\:00\:00\:00:rate25:textTCR\::boxcolor0x000000AA:box1:x20:y20" -y output.mp4 在视频的x20:y20位置添加t…

模拟器快速上手,助力HarmonyOS应用/服务高效开发

文章目录 1 创建模拟器1)打开设备管理界面2)设置本地模拟器实例存储路径3)创建一个模拟器(1)选择模拟器设备(2)创建模拟器(3)启动模拟器(4)关闭模…

HarmonyOS(61) 组件间状态共享的分类以及状态选择器的选取优先级

状态共享 状态共享的分类状态共享选择器State与prop\Link\ObservedObjectLink组合的区别合理选择装饰器的顺序参考资料 状态共享的分类 HarmonyOS的组件之间是可以共享状态数据了,不同的组件之间,状态共享的场景也不一样,根据共享范围从小到…

高德地图 Readme GT 定制版 10.25.0.3249 | 极致简洁

这款定制版高德地图去除了广告,运行速度更快。虽然没有车道级导航、打车功能和红绿灯倒计时等功能,但支持正常登录和收藏功能。检测更新始终为最新版本。 大小:82.5M 下载地址: 百度网盘:https://pan.baidu.com/s/1Y…

去中心化物理基础设施网络(DePIN):重塑未来的基石

一、引言:DePIN的定义与背景 什么是DePIN? 去中心化物理基础设施网络(DePIN,Decentralized Physical Infrastructure Networks)是利用区块链和去中心化技术管理、优化和激励物理资源分配的一种新兴模式。与传统集中式…

模型 布鲁姆法则

系列文章 分享 模型,了解更多👉 模型_思维模型目录。分层提升思维力。 1 布鲁姆法则的应用 1.1 布鲁姆法则在产品开发流程中的应用 背景: 在产品开发领域,创新和效率是关键。布鲁姆法则可以帮助产品经理和设计师系统地提升产品开…

恒创科技:服务器操作系统和客户端操作系统之间的区别

客户端操作系统和服务器操作系统是两种不同的操作系统,旨在满足计算机网络环境中的特定目的。虽然每种类型的操作系统在基本功能方面都有一些相似之处,但它们针对不同的用例进行了优化,并具有针对其特定角色量身定制的特定功能。 什么是服务器…

Flink的双流join理解

如何保证Flink双流Join准确性和及时性、除了窗口join还存在哪些实现方式、究竟如何回答才能完全打动面试官呢。。你将在文中找到答案。 1 引子 1.1 数据库SQL中的JOIN 我们先来看看数据库SQL中的JOIN操作。如下所示的订单查询SQL,通过将订单表的id和订单详情表ord…

2024学习之前端微信小程序开发教程,从入门到精通-含基础+实战+源码code

目录 一、简单介绍 二、课程需知 三、内容编排 1、小程序基础  起步式 目录结构 小程序框架 场景值  逻辑层 视图层 组件 视图容器 基础内容 表单组件 导航 媒体组件 Api 路由 界面 交互 网络 数据缓存 自定义组件 2、项目实战 …

HarmonyOS4+NEXT星河版入门与项目实战(23)------组件转场动画

文章目录 1、控件图解2、案例实现1、代码实现2、代码解释3、实现效果4、总结1、控件图解 这里我们用一张完整的图来汇整 组件转场动画的用法格式、属性和事件,如下所示: 2、案例实现 这里我们对上一节小鱼游戏进行改造,让小鱼在游戏开始的时候增加一个转场动画,让小鱼自…

数据预处理方法—特征选择、特征缩放、特征构造

特征选择 1.1 原理 特征选择是选择对模型训练最重要的特征,减少数据维度,去除冗余或不相关特征,提高模型性能的性能和训练速度,减少过拟合。 1.2 核心公式 可以使用基于树模型的特征重要性度量,如在随机森林中计算特…

【C++/Qt 】使用QCustomplot类打造一款数学函数图像生成工具(支持latex公式渲染+Python连接AI大模型)

✨✨ Rqtz 个人主页 : 点击✨✨ 🌈Qt系列专栏:点击 软件介绍 基于Qt的开源项目QCustomplot类的一款在线的数学函数图像生成工具,涉及到了数学的latex公式渲染,如何将latex语法转换为Python的函数,和如何在Qt中使用QCustomplot类进…

分页查询功能

EmployeeController /** * 员工分页查询 * * param employeePageQueryDTO * return */ GetMapping("/page") ApiOperation("员工分页查询") public Result<PageResult> page(EmployeePageQueryDTO employeePageQueryDTO) { log.info("…

mp4视频流推送的学习

一、依赖引入&#xff1a; ①使用 CDN 的播放器代码 <!-- 引入 xgplayer 核心 --> <script src"https://unpkg.byted-static.com/xgplayer/3.0.10/dist/index.min.js" charset"utf-8"></script><!-- 引入 xgplayer mp4 插件 -->…

C++趣味编程:基于树莓派Pico的模拟沙漏-倾斜开关与LED的互动实现

沙漏,作为一种古老的计时工具,利用重力让沙子通过狭小通道,形成了计时效果。在现代,我们可以通过电子元件模拟沙漏的工作原理。本项目利用树莓派Pico、倾斜开关和LED,实现了一个电子沙漏。以下是项目的详细技术解析与C++代码实现。 一、项目概述 1. 项目目标 通过倾斜开关…

PyG教程:MessagePassing基类

PyG教程&#xff1a;MessagePassing基类 一、引言二、如何自定义消息传递网络1.构造函数2.propagate函数3.message函数4.aggregate函数5.update函数 三、代码实战1.图数据定义2.实现GNN的消息传递过程3.完整代码4.完整代码的精简版本 四、总结1.MessagePassing各个函数的执行顺…

Linux—进程学习—04(进程地址空间学习)

目录 Linux—进程学习—41.程序地址空间1.1虚拟地址空间的现象1.2虚拟地址空间的理解(感性) 2.进程地址空间2.0 mm_struct结构体2.1 mm_struct结构体的源代码2.2分页&虚拟地址空间解释前面的实验现象 2.3进程地址空间存在的原因2.3.1第一个原因2.3.2第二个原因2.3.3第三个原…

信息安全实验--密码学实验工具:CrypTool

1. CrypTool介绍&#x1f4ad; CrypTool 1的开源教育工具&#xff0c;用于密码学研究。通过CrypTool 1&#xff0c;可以实现加密和解密操作&#xff0c;数字签名。CrypTool1和2有很多区别的。 2. CrpyTool下载&#x1f527; 在做信息安全实验--密码学相关实验时&#xff0c;发…

nodejs30: CSS 剪辑路径clip-path导致伪元素不可见问题及解决方法

相关问题 应用圆角裁剪时无法显示::after 取消clip-path设置&#xff1a; 完整问题代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, i…