PiflowX组件-ReadFromKafka

ReadFromKafka组件

组件说明

从kafka中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”读取数据的topic名。亦支持用分号间隔的topic列表,如 ‘topic-1;topic-2’。" "注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。topic-1
topic_patternTOPIC_PATTERN“”匹配读取topic名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的topic都将被Kafka consumer订阅。注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。topic1_*
startup_modeSTARTUP_MODE“”Set(“earliest-offset”, “latest-offset”, “group-offsets”, “timestamp”, “specific-offsets”)Kafka consumer 的启动模式。earliest-offset
schemaSCHEMA“”Kafka消息的schema信息。id:int,name:string,age:int
formatFORMAT“”Set(“json”, “csv”, “avro”, “parquet”, “orc”, “raw”, “protobuf”,“debezium-json”, “canal-json”, “maxwell-json”, “ogg-json”)用来反序列化Kafka消息的格式。注意:该配置项和 ‘value.format’ 二者必需其一。json
groupGROUP“”Kafka source的消费组id。如果未指定消费组ID,则会使用自动生成的"KafkaSource-{tableIdentifier}"作为消费组ID。group_1
propertiesPROPERTIES“”Kafka source连接器其他配置

ReadFromKafka示例配置

{"flow": {"name": "DataGenTest","uuid": "1234","stops": [{"uuid": "0000","name": "DataGen1","bundle": "cn.piflow.bundle.flink.common.DataGen","properties": {"schema": "[{\"filedName\":\"id\",\"filedType\":\"INT\",\"kind\":\"sequence\",\"start\":1,\"end\":10000},{\"filedName\":\"name\",\"filedType\":\"STRING\",\"kind\":\"random\",\"length\":15},{\"filedName\":\"age\",\"filedType\":\"INT\",\"kind\":\"random\",\"max\":100,\"min\":1}]","count": "100","ratio": "5"}},{"uuid": "1111","name": "WriteToKafka1","bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka","properties": {"kafka_host": "hadoop01:9092","topic": "test","schema": "","format": "json","properties": "{}"}},{"uuid": "2222","name": "ReadFromKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka","properties": {"kafka_host": "hadoop01:9092","topic": "test","group": "test","startup_mode": "earliest-offset","schema": "id:int,name:string,age:int","format": "json","properties": "{}"}},{"uuid": "3333","name": "ShowData1","bundle": "cn.piflow.bundle.flink.common.ShowData","properties": {"showNumber": "5000"}}],"paths": [{"from": "DataGen1","outport": "","inport": "","to": "WriteToKafka1"},{"from": "WriteToKafka1","outport": "","inport": "","to": "ReadFromKafka1"},{"from": "ReadFromKafka1","outport": "","inport": "","to": "ShowData1"}]}
}
示例说明

本示例演示了通过DataGen组件生成id,name,age3个字段100条数据,每秒生成5条数据,通过WriteToKafka组件将数据写入到kafka的test topic中,然后通过ReadFromKafka组件从test topic中读取数据,最后使用ShowData组件将数据打印在控制台。

字段描述
[{       "filedName": "id","filedType": "INT","kind": "sequence","start": 1,"end": 10000},{       "filedName": "name","filedType": "STRING","kind": "random","length": 15},{       "filedName": "age","filedType": "INT","kind": "random","max": 100,"min": 1} 
]

1.id字段

id字段类型为INT,使用sequence生成器,序列生成器的起始值为1,结束值为10000.

2.name字段

name字段类型为STRING,使用random生成器,生成字符长度为15。

3.age字段

age字段类型为INT,使用random生成器,随机生成器的最小值为1,最大值为100。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/227081.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

概率论基础复习题

一、填空题 二、选择题 答案:B 答案:C 答案:C 答案:D。统计量不含任何未知参数。 答案:A 答案:C 样本均值是总体均值的无偏估计;样本方差是总体方差的无偏估计。 答案:B。统计值是一…

机器学习的一般步骤

机器学习专注于让机器从大量的数据中模拟人类思考和归纳总结的过程,获得计算模型并自动判断和推测相应的输出结果。机器学习的一般步骤可以概括为以下几个阶段: 数据收集和准备: 收集与问题相关的数据,并确保数据的质量和完整性。…

熊猫目标检测数据集VOC格式1200张

熊猫是中国的国宝,也是世界上最受人喜爱的动物之一。熊猫以其独特的外貌和与生俱来的文化象征意义而闻名于世。它们是一种大型的食草动物,主要分布在中国中部地区的竹林和高山地带。 熊猫的身形圆润笨拙,黑白分明,拥有圆润的脸庞…

【openlayers-3】加载图标

在OpenLayer3中添加图标有两种方式&#xff0c;一种是通过overlay方式&#xff0c;另一种是通过Feature Style的方式。 1、通过overlay方式添加 <div id"mapCon" style"width: 100%; height: 100%; position: absolute;"></div> <div id…

GPT系列概述

OPENAI做的东西 Openai老窝在爱荷华州&#xff0c;微软投资的数据中心 万物皆可GPT下咱们要失业了&#xff1f; 但是世界不仅仅是GPT GPT其实也只是冰山一角&#xff0c;2022年每4天就有一个大型模型问世 GPT历史时刻 GPT-1 带回到2018年的NLP 所有下游任务都需要微调&#x…

关于java循环结构for

关于java循环结构for 在上一篇文章中&#xff0c;我们了解到了while和do…while的结构以及用法&#xff0c;这篇文章我们主要学习一下最常用的循环结构&#xff0c;for结构&#x1f600;&#xff0c;这个结构理解起来相对while结构会难一些&#xff0c;本篇文章内容会很多&…

Linux LVM逻辑卷

一、LVM的定义 LVM 是 Logical Volume Manager 的简称&#xff0c;译为中文就是逻辑卷管理。它是 Linux 下对硬盘分区的一种管理机制。LVM 适合于管理大存储设备&#xff0c;并允许用户动态调整文件系统的大小。此外&#xff0c;LVM 的快照功能可以帮助我们快速备份数据。LVM 为…

L1-072:刮刮彩票

题目描述 刮刮彩票”是一款网络游戏里面的一个小游戏。如图所示&#xff1a; 每次游戏玩家会拿到一张彩票&#xff0c;上面会有 9 个数字&#xff0c;分别为数字 1 到数字 9&#xff0c;数字各不重复&#xff0c;并以 33 的“九宫格”形式排布在彩票上。 在游戏开始时能看见一个…

【ArkTS入门】ArkTS开发初探:语言特点和开发特点

什么是ArkTS&#xff1f; ArkTS是一个为鸿蒙组件而生的框架&#xff0c;语法亲人好用。基于TypeScript&#xff0c;ArkTS拓展了声明式UI、状态管理等的能力&#xff0c;从本质上来讲&#xff0c;是TypeScript的扩展&#xff0c;主要服务于前端。 ArkTS的开发可以满足“一次开…

Python列表的介绍与操作 增改查,连接,赋值,复制,清空

列表 在日常中我们通过给变量赋值来存储数据,比如 a "hello" b "world" c "你好啊" d "....."由于变量一次只能存储一个数据,但我们如果想一次存储多个数据,的话这样存储会很复杂,所以,我们可以通过列表 列表(List)是Python中的…

Collector收集器的高级用法

Collectors收集器的高级用法 场景1&#xff1a;获取关联的班级名称 原先如果需要通过关联字段拿到其他表的某个字段&#xff0c;只能遍历List匹配获取 for (Student student : studentList) {Long clazzId student.getClazzId();// 遍历班级列表&#xff0c;获取学生对应班级…

隐身之术:深入解析代理模式的神秘力量

一、定义 代理模式&#xff08;Proxy Pattern)为其他对象提供一种代理以控制对这个对象的访问,属于结构型模式。 二、解决什么问题 主要解决在直接访问对象时带来的问题&#xff0c;比如说&#xff1a;要访问的对象在远程的机器上。在面向对象系统中&#xff0c;有些对象由于…

一篇文章深入认识微服务SpringCloud和Dubbo的区别

1、SpringCloud是什么 SpringCloud, 基于SpringBoot提供了一套微服务解决方案&#xff0c;包括服务注册与发现&#xff0c;配置中心&#xff0c;全链路监控&#xff0c;服务网关&#xff0c;负载均衡&#xff0c;熔断器等组件&#xff0c;除了基于NetFlix的开源组件做高度抽象…

Conda:Python环境管理的瑞士军刀

在数据科学和机器学习的世界中&#xff0c;管理各种库和依赖关系的重要性不容忽视。Conda 就是为此而生的强大工具。本文将深入探讨 Conda 的简介、功能以及使用示例&#xff0c;帮助你更好地理解和使用这个工具。 Conda 简介 Conda 是一个开源的包管理系统和环境管理系统&am…

新品出击 | 软网关BLIoTLink免费发布

新品出击|软网关BLIoTLink免费发布 BLIoTLink是一款免费的物联网协议转换软件&#xff0c;可以部署在任何基于Linux OS的系统&#xff08;Linux、Debian、Ubuntu、FreeRTOS、RT-Thread&#xff09;中&#xff0c;使用灵活&#xff0c;可以实现数据的采集以及接入网络平台。 BL…

0开始配置Cartographer建图和导航定位

0开始配置Cartographer 日期&#xff1a;12-19 硬件&#xff1a;激光雷达IMU 小车的tf变换&#xff1a; 建图配置 lua文件配置&#xff1a;my_robot.lua include "map_builder.lua" include "trajectory_builder.lua"options {map_builder MAP_BUILDE…

HarmonyOS page生命周期函数讲解

下面 我们又要看一个比较重要的点了 页面生命周期 页面组件有三个生命周期 onPageShow 页面显示时触发 onPageHide 页面隐藏时触发 onBackPress 页面返回时触发 这里 我们准备两个组件 首先是 index.ets 参考代码如下 import router from ohos.router Entry Component struc…

Python FastApi连接oracle进行查询

这边技术选型是cx_oracle进行连接查询&#xff0c;cx_oracle的使用首先要有官方的客户端才能连接到数据库&#xff0c;python并不自带客户端。我用是Python3.9 安装客户端 可以到官网在选择最新版进行下载。 Instant Client for Microsoft Windows (x64) 64-bit 或者直接从我…

HBase深度历险 | 京东物流技术团队

简介 HBase 的全称是 Hadoop Database&#xff0c;是一个分布式的&#xff0c;可扩展&#xff0c;面向列簇的数据库&#xff0c;是一个通过大量廉价的机器解决海量数据的高速存储和读取的分布式数据库解决方案。本文会像剥洋葱一样&#xff0c;层层剥开她的心。 特点 首先我…

【机器学习】深度学习概论(二)

五、受限玻尔兹曼机&#xff08;Restricted Boltzmann Machine&#xff0c;RBM&#xff09; 5.1 RBM介绍 示例代码&#xff1a; Python 编写了一个简单的 RBM 实现&#xff0c;并用一些假数据训练了它。然后&#xff0c;他展示了如何用 RBM 来解释用户的电影偏好&#xff0c;以…