概念
Function 步骤
Pulsar Functions是运行在Pulsar上面的计算框架,输入和输出都是基于Pulsar的Topic。通过使用Function可以对进入Pulsar集群的消息进行简单的清洗、计算,这样不仅避免额外部署单独的流处理引擎(SPE),最大限度的提高开发/维护人员的工作效率。下面是function计算的步骤
- 接收来自一个或多个Toipc的消息
- 将处理逻辑应用于每一条接收到的消息
- 将结果进行输出,同时将日志输出到Log Topic以及将状态更新写入到BookKeeper
Function instance
每个function都有属于自己的全限定名,简称 FQFN,例如:tenant/namespace/name,不同限定名下的哪怕是同名的function是彼此互相隔离的。Function在启动时都会创建对应的Function instance,这是function执行框架的核心,它包含以下内容
- 多个从不同Topic消费消息的消费者
- 一个调用function逻辑的执行器executor
- 一个发送处理结果到Topic的生产者
每个function都可以启动多个instance,每个instance执行的都是function完整的计算逻辑,咱们可以在配置文件中指定instance的启动数量。function启动的时候可以将FQFN作为订阅名,这样方便Pulsar基于订阅类型进行负载均衡。每个function都有一个单独的状态存储,我们可以通过状态接口持久化中间结果在BookKeeper,其他用户可以通过状态接口进行查询。
Function worker
Function worker是一个集监控、编排、执行单个function于一体的逻辑组件。在worker中,每个function都可以作为线程或进程执行,具体取决于所选的配置。
具体流程如下
- 用户向 REST 服务器发送执行函数实例的请求。
- REST 服务器响应请求并将请求传递给函数元数据管理器。
- 函数元数据管理器将请求更新写入函数元数据主题。它还会跟踪所有与元数据相关的消息,并使用函数元数据主题来持久化函数的状态更新。
- 函数元数据管理器从函数元数据主题读取更新,并触发日程管理器计算分配。
- 日程管理器将赋值更新写入赋值主题。
- 函数运行时管理器监听任务分配主题,读取任务分配更新,并更新其内部状态,该状态包含所有工作者的所有任务分配的全局视图。如果更新改变了某个工作者的分配,函数运行时管理器就会通过启动或停止函数实例的执行来实现新的分配。
- 成员管理器要求协调主题选出一名领导工人。所有 Worker 都会以故障转移订阅的方式订阅协调主题,但活跃的 Worker 会成为领导者并执行分配,从而保证该主题只有一个活跃的消费者。
- 成员管理器从协调主题读取更新。
使用
前置动作
- 配置开启function conf/standalone.conf
functionsWorkerEnabled=true
- 启动集群
pulsar standalone
- 检查pulsar端口
telnet localhost 6650
- 检查function集群
pulsar-admin functions-worker get-cluster
输出如下
- 确保租户和命名空间存在
pulsar-admin tenants list
pulsar-admin namespaces list public
- 创建测试tenant以及namespace
pulsar-admin tenants create test
pulsar-admin namespaces create test/test-namespace
- 检查
pulsar-admin namespaces list test
1. 基于配置文件启动function
- 启动example样例function
pulsar-admin functions create \--function-config-file examples/example-function-config.yaml \--jar examples/api-examples.jar
可以查看examples/example-function-config.yaml配置
tenant: "test"
namespace: "test-namespace"
name: "example" # function name
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs: ["test_src"] # this function will read messages from these topics
output: "test_result" # the return value of this function will be sent to this topic
autoAck: true # function will acknowledge input messages if set true
parallelism: 1
可以通过源码查看ExclamationFunction的逻辑,搭配配置可以看到其实就是读取Topic test_src中的数据原封不动的写到Topic test_result
/*** The classic Exclamation Function that appends an exclamation at the end* of the input.*/
public class ExclamationFunction implements Function<String, String> {@Overridepublic String process(String input, Context context) {return String.format("%s!", input);}
}
- 读取example function信息
pulsar-admin functions get \--tenant test \--namespace test-namespace \--name example
- 查询example function的执行信息
pulsar-admin functions status \--tenant test \--namespace test-namespace \--name example
- 启动消费者监听Topic test_result
pulsar-client consume -s test-sub -n 0 test_result
- 往Topic test_src中写入数据
pulsar-client produce -m "test-messages-`date`" -n 10 test_src
可以看到监听到消息,说明example function正常工作
2. 启动有状态的function
- 启动function
pulsar-admin functions create \--function-config-file examples/example-stateful-function-config.yaml \--jar examples/api-examples.jar
可以查看examples/example-stateful-function-config.yaml配置
tenant: "test"
namespace: "test-namespace"
name: "word_count"
className: "org.apache.pulsar.functions.api.examples.WordCountFunction"
inputs: ["test_wordcount_src"] # this function will read messages from these topics
autoAck: true
parallelism: 1
可以通过源码查看WordCountFunction的逻辑,可以看到会根据输入进行分词统计
/*** The classic word count example done using pulsar functions* Each input message is a sentence that split into words and each word counted.* The built in counter state is used to keep track of the word count in a* persistent and consistent manner.*/
public class WordCountFunction implements Function<String, Void> {@Overridepublic Void process(String input, Context context) {Arrays.asList(input.split("\\s+")).forEach(word -> context.incrCounter(word, 1));return null;}
}
- 查询状态,可以看到输出是
key 'hello' doesn't exist.
pulsar-admin functions querystate \--tenant test \--namespace test-namespace \--name word_count -k hello -w
- 生产数据
pulsar-client produce -m "hello" -n 10 test_wordcount_src
执行两次查询状态可以看到 numberValue为 20,说明状态是会累加的
3. 启动窗口function
- 启动窗口function
pulsar-admin functions create \--function-config-file examples/example-window-function-config.yaml \--jar examples/api-examples.jar
可以通过查看配置 examples/example-window-function-config.yaml
tenant: "test"
namespace: "test-namespace"
name: "window-example"
className: "org.apache.pulsar.functions.api.examples.AddWindowFunction"
inputs: ["test_window_src"]
output: "test_window_result"
autoAck: true
parallelism: 1# every 5 messages, calculate sum of the latest 10 messages
windowConfig:windowLengthCount: 10slidingIntervalCount: 5
可以通过源码查看AddWindowFunction的逻辑,可以看到会根据输入的数据进行累加。结合配置可以看到会累加最近10条消息
/*** Example Function that acts on a window of tuples at a time rather than per tuple basis.*/
@Slf4j
public class AddWindowFunction implements Function <Collection<Integer>, Integer> {@Overridepublic Integer apply(Collection<Integer> integers) {return integers.stream().reduce(0, (x, y) -> x + y);}
}
4. 启动自定义function
- 基于Function接口实现自己的逻辑,打包编译成jar包
/*** author: shalock.lin* date: 2024/2/18* describe:*/
public class FormatDateFunction implements Function<String, String> {private SimpleDateFormat format1 = new SimpleDateFormat("yyyy/MM/dd HH/mm/ss");private SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic String process(String input, Context context) throws Exception {Date date = format1.parse(input);return format2.format(date);}
}
- 基于该jar包启动function
pulsar-admin functions create \--tenant public \--namespace default \--name sherlock-manager-pulsar--inputs test1-input-topic \--output persistent://public/default/test1-output-topic \--classname com.sherlock.functions.FormatDateFunction \--jar examples/sherlock-manager-pulsar-1.0-SNAPSHOT.jar
- 启动客户端监听test1-output-topic
pulsar-client consume -s test-sub1 -n 0 persistent://public/default/test1-output-topic
- 往Topic test1-input-topic 里写入数据
pulsar-client produce -m "2024/02/28 18/36/25" persistent://public/default/test1-input-topic
消费者打印
2024-02-28 18:36:25
启动function有时候会遇到下面这个问题,一般都是function命名冲突导致的,修改yaml配置里name在重新create下就可以了
参考文献
-
https://pulsar.apache.org/docs/3.2.x/functions-overview/
-
https://pulsar.apache.org/docs/next/admin-api-functions/
-
https://blog.csdn.net/qian1314520hu/article/details/133925694