(二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例

前言

本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产者和消费者。关于nc工具、flume以及kafka的安装部署,这里不在赘述,请读者查看作者往期博客内容。整体架构如下:

正文

  • 启动Kafka集群,创建first主题

- 启动Kafka集群

- 创建first主题

kafka-topics.sh --bootstrap-server hadoop101:9092 --create --topic first --partitions 3 --replication-factor 3

- 查看first主题详情

kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic first

  • 在hadoop101服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建nc监听服务

 - 创建nc监听的flume任务:job-netcat-flume-kafka.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 1111
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 在hadoop102服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建kafka监听r任务

-  创建kafka监听的flume任务:job-kafka-flume-console.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = logger
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 在hadoop102服务器启动kafka监听任务job-kafka-flume-console.conf

- 启动job-kafka-flume-console.conf任务

bin/flume-ng agent -c conf/ -n a1 -f job/job-kafka-flume-console.conf -Dflume.root.logger=INFO,console

  •  在hadoop101服务器启动nc监听任务job-netcat-flume-kafka.conf

 - 启动job-netcat-flume-kafka.conf任务

bin/flume-ng agent -c conf/ -n a1 -f job/job-netcat-flume-kafka.conf -Dflume.root.logger=INFO,console

  •  使用netcat工具发送数据到nc服务1111端口

- 发送nc消息

  • 查看结果 

- 控制台结果

结语

该案例证明了flume1成功采集到了nc监听端口的数据,并将数据发送到了kafka主题first中,flume2成功从kafka主题中消费到了数据并打印到了控制台。关于Flume数据采集之kafka数据生产与消费的集成案例到这里就结束了,我们下期见。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/131881.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JAVA设计模式8:装饰模式,动态地将责任附加到对象上,扩展对象的功能

作者主页:Designer 小郑 作者简介:3年JAVA全栈开发经验,专注JAVA技术、系统定制、远程指导,致力于企业数字化转型,CSDN博客专家,阿里云社区专家博主,蓝桥云课讲师。 目录 一、什么是装饰模式二、…

春秋云镜 CVE-2015-1427

春秋云镜 CVE-2015-1427 ElasticSearch RCE 靶标介绍 ElasticSearch RCE 启动场景 漏洞利用 因查询时至少要求es中有一条数据,所以发送如下数据包,增加一个数据: POST /website/blog/ HTTP/1.1 Host: eci-2zedttamjkr80i9iubel.cloudeci…

VMware ubuntu空间越用越大

前言 用Ubuntu 1604编译了RK3399的SDK,之后删了一些多余的文件,df - h 已用21G,但window硬盘上还总用了185GB,采用了碎片整理,压缩无法解决 1 启动Ubuntu后, 安装 VMware Tools(T) 、 2 打开ubuntu终端,压…

Revit SDK 介绍:Ribbon 界面

前言 Revit 通过 API 将完整的 Ribbon 做了保留,同时这些菜单按钮也可以和相应的命令绑定。 内容 运行效果如下所示: 菜单特写: Ribbon Sample 整体是 API 暴露出来的一个 RibbonPanel,对应的接口: namespace Au…

FPGA GTH aurora 8b/10b编解码 PCIE 视频传输,提供2套工程源码加QT上位机源码和技术支持

目录 1、前言免责声明 2、我这里已有的 GT 高速接口解决方案3、GTH 全网最细解读GTH 基本结构GTH 发送和接收处理流程GTH 的参考时钟GTH 发送接口GTH 接收接口GTH IP核调用和使用 4、设计思路框架视频源选择silicon9011解码芯片配置及采集动态彩条视频数据组包GTH aurora 8b/10…

Java native 关键字

如你在看 JDK 的源代码的时候,大概率会看到很多方法使用了 native 关键字。 下面是 String 对象 JDK 中的源代码,就带有了一个 native 关键字。 native 是干什么用的 简单来说就是 Java 的 native 方法的实现不是用 Java 实现的,可能在其他…

Flutter 中的单元测试:从工作流基础到复杂场景

对 Flutter 的兴趣空前高涨——而且早就应该出现了。 Google 的开源 SDK 与 Android、iOS、macOS、Web、Windows 和 Linux 兼容。单个 Flutter 代码库支持所有这些。单元测试有助于交付一致且可靠的 Flutter 应用程序,通过在组装之前先发制人地提高代码质量来确保不…

数据结构与算法(二)——前缀、中缀、后缀表达式

一、前缀表达式(波兰表达式) 1.1、计算机求值 从右至左扫描表达式,遇到数字时,将数字压入堆栈。遇到运算符时,弹出栈顶的两个数,用运算符对它们做相应的计算(栈顶元素 和 次顶元素&#xff09…

Navicat导入Excel数据顺序变了

项目场景: Navicat导入Excel数据 问题描述 从Excel表格中导入数据到数据库中。但是,在导入的过程中,我们常会发现数据顺序出现了问题,导致数据错位,给数据的处理带来了极大的麻烦。 原因分析: 这个问题的…

mybatisplus配置拦截器实现保存加密,输出解密,模糊查询

前言:因公司需求需要把某些实体类的某些字段值进行加密保存,在查询时解密明文输出。现记录两种方式。 一、第一种方式: (1)使用TableField(typeHandler TypeHandler.class)注解自带的字段类型处理器,写一…

电脑死机的时候,CPU到底在做什么?

电脑死机,应该每个接触计算机的小伙伴都经历过吧。 尤其是早些年,电脑配置还没现在这么高的时候,多开几个重量级应用程序,死机就能如约而至,就算你把键盘上的CTRLALTDELETE按烂了,任务管理器也出不来&…

Mybatis-Genertor逆向工程

1、导入mybaties插件 <build><plugins><plugin><groupId>org.mybatis.generator</groupId><artifactId>mybatis-generator-maven-plugin</artifactId><version>1.4.2</version><dependencies><dependency>…

Error: svn: E155004: Run ‘svn cleanup‘ to remove locks

解决办法如下&#xff1a;点击settings 点击清除缓存按钮&#xff0c;然后再使用svn进行提交更新操作&#xff0c;但是可能还会有其它的错误&#xff0c;比如svn: E230001: Server SSL certificate verification failed&#xff0c;解决这个错误请参考我另一篇文章&#xff1a;…

博客系统(升级(Spring))(一)创建数据库,创建实例化对象,统一数据格式,统一报错信息

博客系统&#xff08;一&#xff09; 博客系统一、创建项目二、建立数据库结构链接服务器和数据库和Redis 三、创建实例化对象四、统一数据结构结构 五、统一报错信息 博客系统 博客系统是干什么的&#xff1f; CSDN就是一个典型的博客系统。而我在这里就是通过模拟实现一个博…

Python+Requests+Excel接口测试实战

1、EXCEL文件接口保存方式&#xff0c;如图。 2、然后就是读取EXCEL文件中的数据方法&#xff0c;如下&#xff1a; 1 import xlrd2 3 4 class readExcel(object):5 def __init__(self, path):6 self.path path7 8 property9 def getSheet(self): 10 …

莫比乌斯召回系统介绍

当前召回系统只能召回相关性高的广告&#xff0c;但不能保证该广告变现能力强。莫比乌斯做了如下两点创新&#xff1a; 在召回阶段&#xff0c;引入CPM等业务指标作为召回依据在召回阶段&#xff0c;引入CTR模型&#xff0c;从而召回更多相关性高且变现能力强的广告 参考 百度…

基于Protege的知识建模实战

一.Protege简介、用途和特点 1.Protege简介 Protege是斯坦福大学医学院生物信息研究中心基于Java开发的本体编辑和本体开发工具&#xff0c;也是基于知识的编辑器&#xff0c;属于开放源代码软件。这个软件主要用于语义网中本体的构建&#xff0c;是语义网中本体构建的核心开发…

Elasticsearch:什么是生成式人工智能?

生成式人工智能定义 给学生的解释&#xff08;基本&#xff09;&#xff1a; 生成式人工智能是一种可以创造新的原创内容的技术&#xff0c;例如艺术、音乐、软件代码和写作。 当用户输入提示时&#xff0c;人工智能会根据从互联网上现有示例中学到的知识生成响应&#xff0c;…

linux安装Sentinal1.8.6

前言&#xff1a; 使用docker search sentinel-dashboard命令&#xff0c;发现docker中的镜像版本过低&#xff0c;由于要配合使用1.8.6&#xff0c;所以这里采用java后台运行sentinel1.8.6-jar的方式。 1、官网下载对应版本jar&#xff08;https://github.com/alibaba/Sentin…

MySQL间隙锁深入分析

概念 什么是间隙锁&#xff1f; MySQL的间隙锁&#xff08;gap lock&#xff09;是一种锁定相邻数据间隔的机制。 触发时机&#xff1f; 当使用SELECT…FOR UPDATE或UPDATE语句时&#xff0c;MySQL会获取一个范围锁&#xff0c;包括指定条件内的所有数据行&#xff0c;并且还…