Kafka集成flume

1.flume作为生产者集成Kafka

        kafka作为flume的sink,扮演消费者角色

     1.1 flume配置文件

        vim $kafka/jobs/flume-kafka.conf

# agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = TAILDIR
#记录最后监控文件的断点的文件,此文件位置可不改
a1.sources.r1.positionFile =  /export/server/flume/job/data/tail_dir.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /export/server/flume/job/data/.*file.*
a1.sources.r1.filegroups.f2 =/export/server/flume/job/data/.*log.*# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = customers
a1.sinks.k1.kafka.bootstrap.servers =node1:9092,node2:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

1.2开启flume监控

flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume.conf

1.3开启Kafka消费者

kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers --from-beginning

1.4生产数据

往被监控文件输入数据

[ljr@node1 data]$echo hello >>file2.txt 
[ljr@node1 data]$ echo ============== >>file2.txt 

查看Kafka消费者

可见Kafka集成flume生产者成功。

2.flume作为消费者集成Kafka

        kafka作为flume的source,扮演生产者角色

2.1flume配置文件

vim $kafka/jobs/flume-kafka.conf


# agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#注意不要大于channel transactionCapacity的值100
a1.sources.r1.batchSize = 50 
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers =node1:9092, node1:9092
a1.sources.r1.kafka.topics = consumers
a1.sources.r1.kafka.consumer.group.id = custom.g.id# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#注意transactionCapacity的值不要小于sources batchSize的值50
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.2开启flume监控

flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume1.conf

2.3开启Kafka生产者并生产数据

kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers

 

查看flume监控台

可见Kafka集成flume消费者成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/352760.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue2基础:.sync修饰符的使用,认识,作用,本质案例演示,实现父子之间的通信。

.sync的作用: 可以实现子组件与父组件数据的双向绑定,简化代码。 与v-model的不同点,prop属性名可以自定义,不要一定要用value. .sync的本质: 就是:属性名和update:属性名合写。 下面我们进行代码演示…

mellanox HCA IB网卡固件更新

注意事项: 1.如果PSID以SGN开头,说明该产品是曙光的OEM产品,可以向HPC高速网络部获取固件。如果PSID以MT开头,说明该产品是Mellanox或nvidia的标准产品,可以通过官网下载固件。 2.通过官网获取固件,一定要…

pytorch学习笔记7

getitem在进行索引取值的时候自动调用,也是一个魔法方法,就像列表索引取值那样,一个意思 import torchvision from torch.utils.data import DataLoaderdata_transformtorchvision.transforms.Compose([torchvision.transforms.ToTensor()] ) test_datatorchvision.datasets.C…

如何通过数据库与AI实现以图搜图?OceanBase向量功能详解

OceanBase支持向量数据库的基础能力 当前,数据库存储系统与人工智能技术的结合,可以体现在两个主要的应用方向上。 一、近似搜索。它利用大语言模型(LLM,简称大模型)的嵌入(embedding)技术&am…

解决外网404:清除DNS缓存并配置host主机使用知名公共DNS服务

在 Windows 上清除/刷新 DNS 缓存 对于所有Windows版本,清除DNS缓存的过程都是相同的。你需要使用管理员权限打开命令提示符并运行ipconfig /flushdns。 浏览器清除DNS缓存 大多数现代的Web浏览器都有一个内置的DNS客户端,以防止每次访问该网站时…

防止Selenium被检测 Google Chrome 125

背景 最近在使用selenium自动播放学习课程,相信大家也有一些类似的使用场景。 能自动化的事情,绝不自己干。 为防止被检测是机器人做题,刷视频,需要做一些小调整。 先来看作为服务方维护者,是如何检测是Selenium打…

深度神经网络修复策略综述

源自:软件学报 作者:梁震, 刘万伟, 吴陶然, 薛白, 王戟, 杨文婧 注:若出现无法显示完全的情况,可 V 搜索“人工智能技术与咨询”查看完整文章 摘 要 随着智能信息时代的发展, 深度神经网络在人类社会众多领域中的应用, 尤其是…

鸿蒙开发实战:灵活定制Tabs组件,实现个性化页签布局

闪客 沉默的闪客 2024-06-16 20:01 陕西 大家好,又一个项目已经基本完成 是一个元服务英语单词卡片项目,后面一步一步的进行分析拆解,今天来实现一个Tabs组件自定义界面开发。 鸿蒙ArkUI 开发的时候,Tabs 组件很常用,…

Vue项目中实现骨架占位效果-demo

创建组件 Skeleton.vue <template><div class"skeleton"><div class"skeleton-item" v-for"n in count" :key"n"></div></div> </template><script> export default {props: {count: {ty…

基于Matlab的细胞计数图像处理系统(GUI界面有报告) 【含Matlab源码 MX_003期】

简介&#xff1a; 本文旨在解决生物血细胞数目统计的挑战&#xff0c;提出了基于图像处理的综合方案。通过MATLAB平台&#xff0c;我们设计并实现了一套完整的细胞图像处理与分析流程。在预处理阶段&#xff0c;采用图像增强和阈值分割等方法&#xff0c;有效地提高了细胞图像的…

大型语言模型在AMD GPU上的推理优化

Large language model inference optimizations on AMD GPUs — ROCm Blogs 大型语言模型&#xff08;LLMs&#xff09;已经改变了自然语言处理和理解&#xff0c;促进了在多个领域中的众多人工智能应用。LLMs在包括AI助手、聊天机器人、编程、游戏、学习、搜索和推荐系统在内的…

这三款使用的视频、图片设计工具,提供工作效率

Videograp Videograp是一款专注于视频生成的工具&#xff0c;特别适合需要快速剪辑和编辑视频的用户。Videograp具备以下特点&#xff1a; 影音比例转换&#xff1a;Videograp支持调整视频的分辨率和比例&#xff0c;使其更适合不同的播放环境和设备。 AI快剪&#xff1a;该工…

Einops 张量操作快速入门

张量&#xff0c;即多维数组&#xff0c;是现代机器学习框架的支柱。操纵这些张量可能会变得冗长且难以阅读&#xff0c;尤其是在处理高维数据时。Einops 使用简洁的符号简化了这些操作。 Einops &#xff08;Einstein-Inspired Notation for operations&#xff09;&#xff…

第二篇: 掌握Docker的艺术:深入理解镜像、容器和仓库

掌握Docker的艺术&#xff1a;深入理解镜像、容器和仓库 1. 引言 1.1 简要介绍Docker的重要性 在当今快速发展的技术世界中&#xff0c;软件开发和部署的效率和可靠性是衡量成功的关键因素。Docker&#xff0c;作为一个开源的容器化平台&#xff0c;革新了软件的打包、分发和…

电致变色和电泳——有什么区别?

虽然电泳显示器和电致变色显示器都是反射显示器的示例&#xff0c;但其基础技术却截然不同。电致变色显示器采用超薄聚合物&#xff0c;可响应施加的电场而改变颜色。电场使电致变色材料发生化学氧化和还原。这种变化需要的能量很少&#xff0c;而且比较稳定&#xff0c;因此刷…

PostgreSQL性能优化之分区表 #PG培训

在处理大规模数据时&#xff0c;PostgreSQL的性能优化是一个非常重要的话题&#xff0c;其中分区表&#xff08;Partitioned Tables&#xff09;是提高查询和数据管理效率的重要手段。本文将详细介绍PostgreSQL分区表的概念、优势、创建与管理方法以及一些常见的优化策略。 #P…

课程设计——基于FPGA的交通红绿灯控制系统(源代码)

摘要&#xff1a; 本课程设计旨在设计一个基于FPGA&#xff08;现场可编程门阵列&#xff09;的交通红绿灯控制系统。该系统模拟了实际道路交叉口的红绿灯工作场景&#xff0c;通过硬件描述语言&#xff08;如Verilog或VHDL&#xff09;编写源代码实现。系统包含三个主要部分&a…

Servlet快速入门

Servlet Servlet(server applet)是运行在服务端(tomcat)的Java小程序,是sun公司提供的一套定义动态资源的规范,从代码层面讲servlet就是一个接口.用来接收-处理客户端请求,响应给浏览器的动态资源.在整个Web应用中,Servlet主要负责接收处理请求,协同调度功能以及响应数据,可以将…

数据结构-十大排序算法集合(四万字精讲集合)

前言 1&#xff0c;数据结构排序篇章是一个大的工程&#xff0c;这里是一个总结篇章&#xff0c;配备动图和过程详解&#xff0c;从难到易逐步解析。 2&#xff0c;这里我们详细分析几个具备教学意义和实际使用意义的排序&#xff1a; 冒泡排序&#xff0c;选择排序&#xff0c…

算法体系-19 第十九节 暴力递归到动态规划

一 动画规划的概念 优化出现重复解的递归 一旦写出递归来&#xff0c;改动态规划就很快 尝试策略和状态转移方程是一码事 学会尝试是攻克动态规划最本质的能力 如果你发现你有重复调用的过程&#xff0c;动态规划在算过一次之后把答案记下来&#xff0c;下回在越到重复调用过程…