java9的juc包中的Flow接口(响应式编程/发布订阅模式)

前言

在java9的juc包中有一个Flow接口,里面有几个接口 分别为

Publisher 发布者
Subscriber 订阅者
Subscription 订阅关系
Processor 中间操作

用来完成发布订阅模式的响应式开发

我的环境为java17

响应式编程

底层:基于数据缓冲队列+消息驱动模型+异步回调机制

编码:流式编程+链式调用+声明式API

效果:全异步+消息实时处理+高吞吐+占用资源少

简单使用

下面是一个简单的demo,演示了发布者和订阅者如何绑定订阅关系并发布/接收数据

发布者使用的线程为主线程,而订阅者使用的线程为非主线程。

jvm底层已经为整个发布订阅关系做好了异步和缓存区的处理

package com.vhukze.java17demo.test;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;/*** @author vhukze* @date 2024/10/10 - 17:03*/
public class FlowDemo {public static void main(String[] args) {// 发布者try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {// 一个订阅者Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("绑定订阅关系");this.subscription = subscription;// 请求获取一条数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("接收到数据:" + item);// 请求获取下一条数据this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("上游发生异常");}@Overridepublic void onComplete() {System.out.println("完成");}};// 绑定订阅关系publisher.subscribe(subscriber);// 发布数据for (int i = 0; i < 5; i++) {// 正常发布数据publisher.submit("数据-" + i);}// 主线程停一会儿 等待订阅者线程执行结束Thread.sleep(20000);// 发布完成 // 已经放到try-with-resource里面了,可以省略关闭的步骤publisher.close();} catch (Exception e) {throw new RuntimeException(e);}}
}

上述代码的运行结果如下图

 

 

中间操作

代码如下,所有讲解都写到了注释里

这里中间操作我是写了静态内部类,写到了同一个类里面,方便看

package com.vhukze.java17demo.test;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;/*** @author vhukze* @date 2024/10/10 - 17:03*/
public class FlowDemo {public static void main(String[] args) {// 发布者try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {// 一个订阅者Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("绑定订阅关系");this.subscription = subscription;// 请求获取一条数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("s接收到数据:" + item);// 请求获取下一条数据this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("上游发生异常");}@Overridepublic void onComplete() {System.out.println("完成");}};// 哈哈处理器HaHaProcessor haHaProcessor = new HaHaProcessor();// 呵呵处理器HeHeProcessor heHeProcessor = new HeHeProcessor();// 绑定订阅关系 处理器既是发布者也是订阅者 所以步骤为:(发布者绑定处理器 -> 处理器绑定 订阅者/其他处理器)  形成一个责任链publisher.subscribe(haHaProcessor); // 发布者绑定哈哈处理器 此时哈哈处理器相当于订阅者haHaProcessor.subscribe(heHeProcessor); // 哈哈处理器绑定呵呵处理器 此时哈哈处理器相应于发布者 呵呵处理器为订阅者heHeProcessor.subscribe(subscriber); // 呵呵处理器绑定订阅者 此时呵呵处理器相当于发布者// ......可以有无数个中间操作处理器,依次绑定即可   底层数据结构就是链表// 绑定操作就是发布者记住所有订阅者是谁,有数据后给所有订阅者推送数据// 可以拿stream流的api举例,比如第一个处理器做map操作,第二个处理器做filter操作,第三个处理器..... 最后数据到了订阅者// 发布数据for (int i = 0; i < 5; i++) {// 正常发布数据publisher.submit("数据-" + i);}// 主线程停一会儿 等待订阅者线程执行结束Thread.sleep(20000);// 发布完成 // 已经放到try-with-resource里面了,可以省略关闭的步骤publisher.close();} catch (Exception e) {throw new RuntimeException(e);}}// 中间操作处理器 实现Flow.Processor接口 // 继承发布者直接使用发布者的方法,只需要实现订阅者的方法即可static class HaHaProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("绑定哈哈处理器订阅关系");this.subscription = subscription;// 请求获取一条数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("haha接收到数据:" + item);// 给源数据后面拼个哈哈item += "哈哈";// 把加工后的数据提交出去 这里就是调用了父类 也就是SubmissionPublisher发布者的方法super.submit(item);// 请求获取下一条数据this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}}// 中间操作处理器 实现Flow.Processor接口 // 继承发布者直接使用发布者的方法,只需要实现订阅者的方法即可static class HeHeProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("绑定呵呵处理器订阅关系");this.subscription = subscription;// 请求获取一条数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("hehe接收到数据:" + item);// 给源数据后面拼个呵呵item += "呵呵";// 把加工后的数据提交出去 这里就是调用了父类 也就是SubmissionPublisher发布者的方法super.submit(item);// 请求获取下一条数据this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}}
}

上述代码执行结果如下

可以看到最后订阅者接收到的数据已经被加工处理过了 后面多了哈哈和呵呵 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/443829.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

简单的网络爬虫爬取视频

示例代码爬取一个周杰伦相关视频 import requests# 自己想下载的视频链接 video_url https://vdept3.bdstatic.com/mda-qg8cnf4bw5x6bjs5/cae_h264/1720516251158906693/mda-qg8cnf4bw5x6bjs5.mp4?v_from_shkapp-haokan-hbf&auth_key1728497433-0-0-4a32e13f751e04754e4…

RandLA-Net PB C++

tensorflow pb 模型 实现 c++ 部署 Code: https://github.com/QingyongHu/RandLA-Net RandLA-Net PB C++ randlanet_tf.h #ifndef RANDLANET_TF_H_

gaussdb hccdp认证模拟题(判断)

1.在事务ACID特性中&#xff0c;原子性指的是事务必须始终保持系统处于一致的状态。(1 分) 错。 2.某IT公司在开发软件时&#xff0c;需要使用GaussDB数据库&#xff0c;因此需要实现软件和数据的链接&#xff0c;而DBeaver是一个通用的数据库管理工具和 SQL 客户端&#xff…

【windows Server 2012】把我的电脑放在桌面

WinR 打开命令输入框 输入 rundll32.exe shell32.dll,Control_RunDLL desk.cpl,,0

深入理解 CSS 浮动(Float):详尽指南

“批判他人总是想的太简单 剖析自己总是想的太困难” 文章目录 前言文章有误敬请斧正 不胜感恩&#xff01;目录1. 什么是 CSS 浮动&#xff1f;2. CSS 浮动的历史背景3. 基本用法float 属性值浮动元素的行为 4. 浮动对文档流的影响5. 清除浮动clear 属性清除浮动的技巧1. 使用…

推荐一个物联网平台,支持源代码交付

ThingsKit物联网平台概述&#xff1a; ThingsKit是一个开箱即用的物联网平台&#xff0c;它支持通过行业标准的物联网协议&#xff08;如MQTT、TCP、UDP、CoAP和HTTP&#xff09;实现设备连接。这个平台能够帮助用户快速实现物联网的数据收集、分析处理、可视化和设备管理&…

【韩顺平Java笔记】第8章:面向对象编程(中级部分)【297-313】

文章目录 297. super基本语法297.1 基本介绍297.2 基本语法 298. super使用细节1299. super使用细节2300. super使用细节3301. 方法重写介绍302. 方法重写细节303. 重写课堂练习1304. 重写课堂练习2输出结果&#xff1a; 姓名&#xff1a;田所浩二 年龄:24305. 养宠物引出多态3…

河道垃圾数据集 水污染数据集——无人机视角数据集 共3000张图片,可直接用于河道垃圾、水污染功能检测 已标注yolo格式、voc格式,可直接训练;

河道垃圾数据集 水污染数据集——无人机视角数据集 共3000张图片&#xff0c;可直接用于河道垃圾、水污染功能检测 已标注yolo格式、voc格式&#xff0c;可直接训练&#xff1b; 河道垃圾与水污染检测数据集&#xff08;无人机视角&#xff09; 项目概述 本数据集是一个专门用…

短剧小程序短剧APP在线追剧APP网剧推广分销微短剧小剧场小程序集师知识付费集师短剧小程序集师小剧场小程序集师在线追剧小程序源码

一、产品简介功能介绍 集师专属搭建您的独有短剧/追剧/小剧场小程序或APP平台 二、短剧软件私域运营解决方案 针对短剧类小程序的运营&#xff0c;以下提出10条具体的方案&#xff1a; 明确定位与目标用户&#xff1a; 对短剧类小程序进行明确定位&#xff0c;了解目标用户群体…

Chatgpt 原理解构

一、背景知识 1. 自然语言处理的发展历程 自然语言处理在不同时期呈现出不同的特点和发展态势。萌芽期&#xff0c;艾伦・图灵在 1936 年提出 “图灵机” 概念&#xff0c;为计算机诞生奠定基础&#xff0c;1950 年他提出著名的 “图灵测试”&#xff0c;预见了计算机处理自然…

Oracle 闪回版本(闪回表到指定SCN)

1.创建目录 mkdir /u01/app/oracle/flash 2.配置FRA alter system set db_recovery_file_dest_size15G; alter system set db_recovery_file_dest/u01/app/oracle/flash; 3.设置闪回参数--确保可以闪回48h内的数据库 alter system set db_flashback_retention_target2880; 4…

望繁信科技成功签约国显科技 流程挖掘助力制造业智造未来

近日&#xff0c;上海望繁信科技有限公司&#xff08;简称“望繁信科技”&#xff09;成功与深圳市国显科技有限公司&#xff08;简称“国显科技”&#xff09;达成合作。国显科技作为全球领先的TFT-LCD液晶显示及Mini/Micro LED显示产品供应商&#xff0c;致力于为笔记本、手机…

经典蓝牙BLE版本区别:【图文讲解】

蓝牙是一种短距的无线通讯技术&#xff0c;可实现固定设备、移动设备之间的数据交换。一般将蓝牙3.0之前的BR/EDR蓝牙称为传统蓝牙&#xff0c;而将蓝牙4.0规范下的LE蓝牙称为低功耗蓝牙&#xff08;BLE&#xff09;。 1&#xff1a;蓝牙4.0 BLE 4.0版本是3.0版本的升级版本&a…

uniapp学习(004-1 组件 Part.2生命周期)

零基础入门uniapp Vue3组合式API版本到咸虾米壁纸项目实战&#xff0c;开发打包微信小程序、抖音小程序、H5、安卓APP客户端等 总时长 23:40:00 共116P 此文章包含第31p-第p35的内容 文章目录 组件生命周期我们主要使用的三种生命周期setup(创建组件时执行)不可以操作dom节点…

Solidity优质例子(二)物流的增删改查智能合约(附truffle测试)

本合约非常适合新手学习&#xff0c;其包含了基本的增删改查功能以及各个方式的不同之处的总结&#xff0c;本套合约我也编写了truffle测试&#xff0c;学习truffle测试的小伙伴也有福了~ 该合约的主要作用是通过区块链技术实现物流追踪系统的透明化、自动化与防篡改特性&#…

乐歌E5,E6系列升降桌质量如何?2024推荐必买的四款热销型号

在数字化时代&#xff0c;电脑桌成为了我们日常生活和工作中不可或缺的一部分。然而&#xff0c;长时间坐在固定高度的电脑桌前&#xff0c;不仅会影响我们的工作效率&#xff0c;还可能对身体健康造成不良影响。因此&#xff0c;一款能够电动升降的电脑桌显得尤为重要。 乐歌…

RabbbitMQ篇(环境搭建 - 下载 安装)(持续更新迭代)

目录 一、Windows 1. 下载安装程序 2. 安装配置erlang 3. 安装rabbitMQ 4. 验证 二、Linux 1. 下载rpm包 1.1. 下载Erlang的rpm包 1.2. 下载socat的rpm包 1.3. 下载RabbitMQ的rpm包 2. 安装 2.1. 安装Erlang 2.2. 安装socat 2.3. 安装RabbitMQ 3. 启动RabbitMQ服…

【含开题报告+文档+PPT+源码】基于SpringBoot的社区家政服务预约系统设计与实现【包运行成功】

开题报告 社区家政服务是满足居民日常生活需求的重要组成部分&#xff0c;在现代社会中发挥着越来越重要的作用。随着城市化进程的不断加速&#xff0c;社区家政服务需求量呈现持续增长的趋势。然而&#xff0c;传统的家政服务模式存在一些问题&#xff0c;如预约流程繁琐、信…

【Python自动化测试】如何才能让用例自动运行完之后,生成一张直观可看易懂的测试报告呢?

小编使用的是unittest的一个扩展HTMLTestRunner 环境准备 使用之前&#xff0c;我们需要下载HTMLTestRunner.py文件 点击HTMLTestRunner后进入的是一个写满代码的网页&#xff0c;小编推荐操作&#xff1a;右键 --> 另存为&#xff0c;文件名称千万不要改 python3使用上述…

手撕AVL树

&#x1f525;个人主页&#x1f525;&#xff1a;鱼骨不是鱼翅-CSDN博客 &#x1f308;收录专栏&#x1f308;&#xff1a;高阶数据结构_鱼骨不是鱼翅的博客-CSDN博客 &#x1f516; 学如逆水行舟&#xff0c;不进则退 目录 一.AVL树的概念 二.平衡因子 三.平衡二叉树怎么调…