Pulsar3.2 Function的介绍与使用

概念

Function 步骤

Pulsar Functions是运行在Pulsar上面的计算框架,输入和输出都是基于Pulsar的Topic。通过使用Function可以对进入Pulsar集群的消息进行简单的清洗、计算,这样不仅避免额外部署单独的流处理引擎(SPE),最大限度的提高开发/维护人员的工作效率。下面是function计算的步骤
在这里插入图片描述

  1. 接收来自一个或多个Toipc的消息
  2. 将处理逻辑应用于每一条接收到的消息
  3. 将结果进行输出,同时将日志输出到Log Topic以及将状态更新写入到BookKeeper

Function instance

每个function都有属于自己的全限定名,简称 FQFN,例如:tenant/namespace/name,不同限定名下的哪怕是同名的function是彼此互相隔离的。Function在启动时都会创建对应的Function instance,这是function执行框架的核心,它包含以下内容

  1. 多个从不同Topic消费消息的消费者
  2. 一个调用function逻辑的执行器executor
  3. 一个发送处理结果到Topic的生产者
    在这里插入图片描述

每个function都可以启动多个instance,每个instance执行的都是function完整的计算逻辑,咱们可以在配置文件中指定instance的启动数量。function启动的时候可以将FQFN作为订阅名,这样方便Pulsar基于订阅类型进行负载均衡。每个function都有一个单独的状态存储,我们可以通过状态接口持久化中间结果在BookKeeper,其他用户可以通过状态接口进行查询。

Function worker

Function worker是一个集监控、编排、执行单个function于一体的逻辑组件。在worker中,每个function都可以作为线程或进程执行,具体取决于所选的配置。
在这里插入图片描述
具体流程如下

  1. 用户向 REST 服务器发送执行函数实例的请求。
  2. REST 服务器响应请求并将请求传递给函数元数据管理器。
  3. 函数元数据管理器将请求更新写入函数元数据主题。它还会跟踪所有与元数据相关的消息,并使用函数元数据主题来持久化函数的状态更新。
  4. 函数元数据管理器从函数元数据主题读取更新,并触发日程管理器计算分配。
  5. 日程管理器将赋值更新写入赋值主题。
  6. 函数运行时管理器监听任务分配主题,读取任务分配更新,并更新其内部状态,该状态包含所有工作者的所有任务分配的全局视图。如果更新改变了某个工作者的分配,函数运行时管理器就会通过启动或停止函数实例的执行来实现新的分配。
  7. 成员管理器要求协调主题选出一名领导工人。所有 Worker 都会以故障转移订阅的方式订阅协调主题,但活跃的 Worker 会成为领导者并执行分配,从而保证该主题只有一个活跃的消费者。
  8. 成员管理器从协调主题读取更新。

使用

前置动作

  1. 配置开启function conf/standalone.conf
functionsWorkerEnabled=true
  1. 启动集群
pulsar standalone
  1. 检查pulsar端口
telnet localhost 6650
  1. 检查function集群
pulsar-admin functions-worker get-cluster

输出如下
在这里插入图片描述

  1. 确保租户和命名空间存在
pulsar-admin tenants list
pulsar-admin namespaces list public

在这里插入图片描述

  1. 创建测试tenant以及namespace
pulsar-admin tenants create test
pulsar-admin namespaces create test/test-namespace
  1. 检查
pulsar-admin namespaces list test

1. 基于配置文件启动function

  1. 启动example样例function
pulsar-admin functions create \--function-config-file examples/example-function-config.yaml \--jar examples/api-examples.jar

可以查看examples/example-function-config.yaml配置

tenant: "test"
namespace: "test-namespace"
name: "example" # function name
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs: ["test_src"] # this function will read messages from these topics
output: "test_result" # the return value of this function will be sent to this topic
autoAck: true # function will acknowledge input messages if set true
parallelism: 1

可以通过源码查看ExclamationFunction的逻辑,搭配配置可以看到其实就是读取Topic test_src中的数据原封不动的写到Topic test_result

/*** The classic Exclamation Function that appends an exclamation at the end* of the input.*/
public class ExclamationFunction implements Function<String, String> {@Overridepublic String process(String input, Context context) {return String.format("%s!", input);}
}
  1. 读取example function信息
pulsar-admin functions get \--tenant test \--namespace test-namespace \--name example
  1. 查询example function的执行信息
pulsar-admin functions status \--tenant test \--namespace test-namespace \--name example
  1. 启动消费者监听Topic test_result
pulsar-client consume -s test-sub -n 0 test_result
  1. 往Topic test_src中写入数据
pulsar-client produce -m "test-messages-`date`" -n 10 test_src

可以看到监听到消息,说明example function正常工作
在这里插入图片描述

2. 启动有状态的function

  1. 启动function
pulsar-admin functions create \--function-config-file examples/example-stateful-function-config.yaml \--jar examples/api-examples.jar

可以查看examples/example-stateful-function-config.yaml配置

tenant: "test"
namespace: "test-namespace"
name: "word_count"
className: "org.apache.pulsar.functions.api.examples.WordCountFunction"
inputs: ["test_wordcount_src"] # this function will read messages from these topics
autoAck: true
parallelism: 1

可以通过源码查看WordCountFunction的逻辑,可以看到会根据输入进行分词统计

/*** The classic word count example done using pulsar functions* Each input message is a sentence that split into words and each word counted.* The built in counter state is used to keep track of the word count in a* persistent and consistent manner.*/
public class WordCountFunction implements Function<String, Void> {@Overridepublic Void process(String input, Context context) {Arrays.asList(input.split("\\s+")).forEach(word -> context.incrCounter(word, 1));return null;}
}
  1. 查询状态,可以看到输出是 key 'hello' doesn't exist.
pulsar-admin functions querystate \--tenant test \--namespace test-namespace \--name word_count -k hello -w
  1. 生产数据
pulsar-client produce -m "hello" -n 10 test_wordcount_src

执行两次查询状态可以看到 numberValue为 20,说明状态是会累加的
在这里插入图片描述

3. 启动窗口function

  1. 启动窗口function
pulsar-admin functions create \--function-config-file examples/example-window-function-config.yaml \--jar examples/api-examples.jar

可以通过查看配置 examples/example-window-function-config.yaml

tenant: "test"
namespace: "test-namespace"
name: "window-example"
className: "org.apache.pulsar.functions.api.examples.AddWindowFunction"
inputs: ["test_window_src"]
output: "test_window_result"
autoAck: true
parallelism: 1# every 5 messages, calculate sum of the latest 10 messages
windowConfig:windowLengthCount: 10slidingIntervalCount: 5

可以通过源码查看AddWindowFunction的逻辑,可以看到会根据输入的数据进行累加。结合配置可以看到会累加最近10条消息

/*** Example Function that acts on a window of tuples at a time rather than per tuple basis.*/
@Slf4j
public class AddWindowFunction implements Function <Collection<Integer>, Integer> {@Overridepublic Integer apply(Collection<Integer> integers) {return integers.stream().reduce(0, (x, y) -> x + y);}
}

4. 启动自定义function

  1. 基于Function接口实现自己的逻辑,打包编译成jar包
/*** author: shalock.lin* date: 2024/2/18* describe:*/
public class FormatDateFunction implements Function<String, String> {private SimpleDateFormat format1 = new SimpleDateFormat("yyyy/MM/dd HH/mm/ss");private SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic String process(String input, Context context) throws Exception {Date date = format1.parse(input);return format2.format(date);}
}
  1. 基于该jar包启动function
pulsar-admin functions create \--tenant public \--namespace default \--name sherlock-manager-pulsar--inputs test1-input-topic \--output persistent://public/default/test1-output-topic \--classname com.sherlock.functions.FormatDateFunction \--jar examples/sherlock-manager-pulsar-1.0-SNAPSHOT.jar
  1. 启动客户端监听test1-output-topic
pulsar-client consume -s test-sub1 -n 0 persistent://public/default/test1-output-topic
  1. 往Topic test1-input-topic 里写入数据
pulsar-client produce -m "2024/02/28 18/36/25" persistent://public/default/test1-input-topic

消费者打印

2024-02-28 18:36:25

启动function有时候会遇到下面这个问题,一般都是function命名冲突导致的,修改yaml配置里name在重新create下就可以了
在这里插入图片描述

参考文献

  1. https://pulsar.apache.org/docs/3.2.x/functions-overview/

  2. https://pulsar.apache.org/docs/next/admin-api-functions/

  3. https://blog.csdn.net/qian1314520hu/article/details/133925694

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/265591.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【力扣hot100】刷题笔记Day14

前言 又是新的一周&#xff0c;快乐的周一&#xff0c;快乐地刷题&#xff0c;今天把链表搞完再干活&#xff01; 114. 二叉树展开为链表 - 力扣&#xff08;LeetCode&#xff09; 前序遍历 class Solution:def flatten(self, root: Optional[TreeNode]) -> None:if not r…

Bicycles(变形dijkstra,动态规划思想)

Codeforces Round 918 (Div. 4) G. Bicycles G. Bicycles 题意&#xff1a; 斯拉夫的所有朋友都打算骑自行车从他们住的地方去参加一个聚会。除了斯拉维奇&#xff0c;他们都有一辆自行车。他们可以经过 n n n 个城市。他们都住在城市 1 1 1 &#xff0c;想去参加位于城市…

【Java程序员面试专栏 算法思维】四 高频面试算法题:回溯算法

一轮的算法训练完成后,对相关的题目有了一个初步理解了,接下来进行专题训练,以下这些题目就是汇总的高频题目,本篇主要聊聊回溯算法,主要就是排列组合问题,所以放到一篇Blog中集中练习 题目关键字解题思路时间空间全排列回溯算法【元素无重不可复选】构造全排列树,用使…

kafka三节点集群平滑升级过程指导

一、前言 Apache Kafka作为常用的开源分布式流媒体平台&#xff0c;可以实时发布、订阅、存储和处理数据流,多用于作为消息队列获取实时数据&#xff0c;构建对数据流的变化进行实时反应的应用程序&#xff0c;已被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型…

Redis String 类型底层揭秘

目录 前言 String 类型低层数据结构 节省内存的数据结构 前言 Redis 的 string 是个 “万金油” &#xff0c;这么评价它不为过. 它可以保存Long 类型整数&#xff0c;字符串&#xff0c; 甚至二进制也可以保存。对于key&#xff0c;value 这样的单值&#xff0c;查询以及插…

详解Kotlin中run、with、let、also与apply的使用和区别

Kotlin作为一种现代、静态类型的编程语言&#xff0c;不仅提供了丰富的特性&#xff0c;还提供了极具表现力的函数&#xff1a;run, with, let, also, 和 apply。理解这些函数的不同之处对于编写高效、易于维护的代码至关重要。 函数对比表 函数对象引用返回值使用场景runthi…

猜数游戏(个人学习笔记黑马学习)

案例需求 定义一个数字(1~10&#xff0c;随机产生)&#xff0c;通过3次判断来猜出来数字 案例要求&#xff1a; 1.数字随机产生&#xff0c;范围1-10 2.有3次机会猜测数字&#xff0c;通过 3.层嵌套判断实现每次猜不中&#xff0c;会提示大了或小了 提示&#xff0c;通过如下代…

【海贼王的数据航海:利用数据结构成为数据海洋的霸主】链表—单链表

目录 1 -> 链表 1.1 -> 链表的概念及结构 1.2 -> 链表的分类 2 -> 无头单向非循环链表(单链表) 2.1 -> 接口声明 2.2 -> 接口实现 2.2.1 -> 动态申请一个结点 2.2.2 -> 单链表的打印 2.2.3 -> 单链表的尾插 2.2.4 -> 单链表的头插 2.…

消息中间件篇之RabbitMQ-消息不丢失

一、生产者确认机制 RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后&#xff0c;会返回一个结果给发送者&#xff0c;表示消息是否处理成功。 当消息没有到交换机就失败了&#xff0c;就会返回publish-confirm。当消息没有到达MQ时&…

2.27数据结构

1.链队 //link_que.c #include "link_que.h"//创建链队 Q_p create_que() {Q_p q (Q_p)malloc(sizeof(Q));if(qNULL){printf("空间申请失败\n");return NULL;}node_p L(node_p)malloc(sizeof(node));if(LNULL){printf("申请空间失败\n");return…

DETR(1):论文详解

文章目录 1. DETR 模型结构2.损失函数2.1 预测结果和GT 的匹配2.2 训练的loss计算3.实验3.1 大物体表现效果好3.2 Transformer Encoder 和Decoder的作用3.3 object query4. 伪代码5. 结论

【《高性能 MySQL》摘录】第 2 章 MySQL 基准测试

文章目录 2.1 为什么需要基准测试2.2 基准测试的策略2.2.1 测试何种指标 2.3 基准测试方法2.3.1 设计和规划基准测试2.3.2 基准测试应该运行多长时间2.3.3 获取系统性能和状态2.3.4 获得准确的测试结果2.3.5 运行基准测试并分析结果2.3.6 绘图的重要性 2.4 基准测试工具…

IntelliJ IDEA 2023:创新不止步,开发更自由 mac/win版

IntelliJ IDEA 2023激活版是一款强大而智能的集成开发环境(IDE)&#xff0c;为开发者提供了一系列先进的功能和工具&#xff0c;帮助他们更高效地编写、调试和测试代码。 IntelliJ IDEA 2023 软件获取 IntelliJ IDEA 2023继承了其前代版本的优秀基因&#xff0c;并在此基础上进…

2月28日代码随想录二叉搜索树中的众数

摸了一个寒假了&#xff0c;得赶一赶了 251.二叉搜索树中的众数 给你一个含重复值的二叉搜索树&#xff08;BST&#xff09;的根节点 root &#xff0c;找出并返回 BST 中的所有 众数&#xff08;即&#xff0c;出现频率最高的元素&#xff09;。 如果树中有不止一个众数&am…

虚拟机JVM

虚拟机 1、定义jvm 假想计算机 运行在操作系统之上 和硬件之间没有直接交互 包括 一套字节码指令、寄存器、栈、垃圾回收、堆 一个存储方法域 jvm:承担一个翻译工作&#xff0c;动态的将java代码编译成操作系统可以识别的机器码。 从软件层面屏蔽了不同操作系统在底层硬件与指…

查看cuda和cudnn版本

查看cuda 打开命令提示符&#xff08;Windows键 R&#xff0c;然后输入cmd并回车&#xff09;。输入nvcc --version或者nvcc -V来获取Cuda的版本信息。 查看cudnn版本 查看Cudnn版本&#xff1a; 进入Cuda安装目录&#xff0c;通常位于C:\Program Files\NVIDIA GPU Computi…

Doris——荔枝微课统一实时数仓建设实践

目录 一、业务介绍 二、早期架构及痛点 2.1 早期架构 2.2 架构痛点 三、技术选型 四、新的架构及方案 五、搭建经验 5.1 数据建模 5.2 数据开发 5.3 库表设计 5.4 数据管理 5.4.1 监控告警 5.4.2 数据备份与恢复 六、收益总结 七、未来规划 原文大佬这篇Doris腾…

STM32 Cubemx配置SPI编程(使用Flash模块)

文章目录 前言一、W25Q64模块介绍二、STM32Cubemx配置SPI三、SPI HAL库操作函数分析3.1查询方式3.2中断方式 四、Flash时序分析1.读器件ID指令2.写使能3.擦除扇区4.页编程5.读数据6.读状态寄存器 五、Flash驱动程序编写1.代码编写测试 总结 前言 本篇文章来为大家讲解一下Flas…

安装极狐GitLab Runner并测试使用

本文继【新版极狐安装配置详细版】之后继续 1. 添加官方极狐GitLab 仓库&#xff1a; 对于 RHEL/CentOS/Fedora&#xff1a; curl -L "https://packages.gitlab.com/install/repositories/runner/gitlab-runner/script.rpm.sh" | sudo bash2. 安装最新版本的极狐G…

STM32 4位数码管和74HC595

4位数码管 在使用一位数码管的时候&#xff0c;会用到8个IO口&#xff0c;那如果使用4位数码管&#xff0c;难道要使用32个IO口吗&#xff1f;肯定是不行的&#xff0c;太浪费了IO口了。把四个数码管全部接一起共用8个IO口&#xff0c;然后分别给他们一个片选。所以4位数码管共…