前言
在java9的juc包中有一个Flow接口,里面有几个接口 分别为
Publisher 发布者
Subscriber 订阅者
Subscription 订阅关系
Processor 中间操作
用来完成发布订阅模式的响应式开发
我的环境为java17
响应式编程
底层:基于数据缓冲队列+消息驱动模型+异步回调机制
编码:流式编程+链式调用+声明式API
效果:全异步+消息实时处理+高吞吐+占用资源少
简单使用
下面是一个简单的demo,演示了发布者和订阅者如何绑定订阅关系并发布/接收数据
发布者使用的线程为主线程,而订阅者使用的线程为非主线程。
jvm底层已经为整个发布订阅关系做好了异步和缓存区的处理
package com.vhukze.java17demo.test;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;/*** @author vhukze* @date 2024/10/10 - 17:03*/
public class FlowDemo {public static void main(String[] args) {// 发布者try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {// 一个订阅者Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("绑定订阅关系");this.subscription = subscription;// 请求获取一条数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("接收到数据:" + item);// 请求获取下一条数据this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("上游发生异常");}@Overridepublic void onComplete() {System.out.println("完成");}};// 绑定订阅关系publisher.subscribe(subscriber);// 发布数据for (int i = 0; i < 5; i++) {// 正常发布数据publisher.submit("数据-" + i);}// 主线程停一会儿 等待订阅者线程执行结束Thread.sleep(20000);// 发布完成 // 已经放到try-with-resource里面了,可以省略关闭的步骤publisher.close();} catch (Exception e) {throw new RuntimeException(e);}}
}
上述代码的运行结果如下图
中间操作
代码如下,所有讲解都写到了注释里
这里中间操作我是写了静态内部类,写到了同一个类里面,方便看
package com.vhukze.java17demo.test;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;/*** @author vhukze* @date 2024/10/10 - 17:03*/
public class FlowDemo {public static void main(String[] args) {// 发布者try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {// 一个订阅者Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("绑定订阅关系");this.subscription = subscription;// 请求获取一条数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("s接收到数据:" + item);// 请求获取下一条数据this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("上游发生异常");}@Overridepublic void onComplete() {System.out.println("完成");}};// 哈哈处理器HaHaProcessor haHaProcessor = new HaHaProcessor();// 呵呵处理器HeHeProcessor heHeProcessor = new HeHeProcessor();// 绑定订阅关系 处理器既是发布者也是订阅者 所以步骤为:(发布者绑定处理器 -> 处理器绑定 订阅者/其他处理器) 形成一个责任链publisher.subscribe(haHaProcessor); // 发布者绑定哈哈处理器 此时哈哈处理器相当于订阅者haHaProcessor.subscribe(heHeProcessor); // 哈哈处理器绑定呵呵处理器 此时哈哈处理器相应于发布者 呵呵处理器为订阅者heHeProcessor.subscribe(subscriber); // 呵呵处理器绑定订阅者 此时呵呵处理器相当于发布者// ......可以有无数个中间操作处理器,依次绑定即可 底层数据结构就是链表// 绑定操作就是发布者记住所有订阅者是谁,有数据后给所有订阅者推送数据// 可以拿stream流的api举例,比如第一个处理器做map操作,第二个处理器做filter操作,第三个处理器..... 最后数据到了订阅者// 发布数据for (int i = 0; i < 5; i++) {// 正常发布数据publisher.submit("数据-" + i);}// 主线程停一会儿 等待订阅者线程执行结束Thread.sleep(20000);// 发布完成 // 已经放到try-with-resource里面了,可以省略关闭的步骤publisher.close();} catch (Exception e) {throw new RuntimeException(e);}}// 中间操作处理器 实现Flow.Processor接口 // 继承发布者直接使用发布者的方法,只需要实现订阅者的方法即可static class HaHaProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("绑定哈哈处理器订阅关系");this.subscription = subscription;// 请求获取一条数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("haha接收到数据:" + item);// 给源数据后面拼个哈哈item += "哈哈";// 把加工后的数据提交出去 这里就是调用了父类 也就是SubmissionPublisher发布者的方法super.submit(item);// 请求获取下一条数据this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}}// 中间操作处理器 实现Flow.Processor接口 // 继承发布者直接使用发布者的方法,只需要实现订阅者的方法即可static class HeHeProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("绑定呵呵处理器订阅关系");this.subscription = subscription;// 请求获取一条数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("hehe接收到数据:" + item);// 给源数据后面拼个呵呵item += "呵呵";// 把加工后的数据提交出去 这里就是调用了父类 也就是SubmissionPublisher发布者的方法super.submit(item);// 请求获取下一条数据this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}}
}
上述代码执行结果如下
可以看到最后订阅者接收到的数据已经被加工处理过了 后面多了哈哈和呵呵