响应式编程一、Reactor核心

目录

  • 一、前置知识
    • 1、Lambda表达式
    • 2、==函数式接口 Function==
    • 3、==StreamAPI==
    • 4、Reactive-Stream
      • 1)几个实际的问题
      • 2)Reactive-Stream是什么?
      • 3)==核心接口==
      • 4)处理器 Processor
      • 5)总结
  • 二、Reactor核心
    • 1、Reactor
      • 1)介绍
      • 2)Reactor的三个核心特性
      • 3)响应式编程

课程内容
在这里插入图片描述

一、前置知识

1、Lambda表达式

interface MyInterface {int sum(int i, int j);
}interface MyHaha {int haha();default int heihei() {return 2;}; //默认实现
}@FunctionalInterface //检查注解,帮我们快速检查我们写的接口是否函数式接口
interface MyHehe {int hehe(int i);}/*** lambda简化函数式接口实例创建** @param args*/public static void aaaa(String[] args) {//1、自己创建实现类对象MyInterface myInterface = new MyInterfaceImpl();System.out.println(myInterface.sum(1, 2));//2、创建匿名实现类MyInterface myInterface1 = new MyInterface() {@Overridepublic int sum(int i, int j) {return i * i + j * j;}};
//        System.out.println(myInterface1.sum(2, 3));//冗余写法//3、lambda表达式:语法糖  参数列表  + 箭头 + 方法体MyInterface myInterface2 = (x, y) -> {return x * x + y * y;};System.out.println(myInterface2.sum(2, 3));}//参数位置最少情况MyHaha myHaha = () -> {return 1;};MyHehe myHehe = y -> {return y * y;};MyHehe hehe2 = y -> y - 1;//总结://1)、参数类型可以不写,只写(参数名),参数变量名随意定义;//    参数表最少可以只有一个 (),或者只有一个参数名;//2、方法体如果只有一句话,{} 可以省略

2、函数式接口 Function

接口中有且只有一个未实现的方法,这个接口就叫做函数式接口

函数式接口的出入参定义:
1、有入参,无出参【消费者】BiConsumer

 BiConsumer<String,String> function = (a,b)->{ //能接受两个入参System.out.println("哈哈:"+a+";呵呵:"+b);};function.accept("1","2");

2、有入参,有出参【多功能函数】: Function

Function<String,Integer> function = (String x) -> Integer.parseInt(x);System.out.println(function.apply("2"));

3、无入参,无出参【普通函数】:

Runnable runnable = () -> System.out.println("aaa");
new Thread(runnable).start();

4、无入参 ,有出参【提供者】: supplier

Supplier<String> supplier = ()-> UUID.randomUUID().toString();
String s = supplier.get();
System.out.println(s);

java.util.function包下的所有function定义:
● Consumer: 消费者
● function: 功能函数
● Supplier: 提供者
● Predicate: 断言
get/test/apply/accept调用的函数方法;

3、StreamAPI

中间操作:Intermediate Operations

  • filter:过滤; 挑出我们用的元素
  • map: 映射: 一一映射,a 变成 b
    • mapToInt、mapToLong、mapToDouble
  • flatMap:一对多映射

filter、 map、mapToInt、mapToLong、mapToDouble flatMap、flatMapToInt、flatMapToLong、flatMapToDouble mapMulti、mapMultiToInt、mapMultiToLong、mapMultiToDouble、 parallel、unordered、onClose、sequential distinct、sorted、peek、limit、skip、takeWhile、dropWhile、

终止操作:Terminal Operation
forEach、forEachOrdered、toArray、reduce、collect、toList、min、 max、count、anyMatch、allMatch、noneMatch、findFirst、findAny、iterator



4、Reactive-Stream

1)几个实际的问题

当请求量巨大的时候,tomcat会被压垮,此时就需要采取背压的策略,让tomcat根据自己的能力主动去消费请求。
在这里插入图片描述

服务器核心数固定时,多线程情况下:线程越多越好吗?
当核心数固定时,线程并不是越多越好,操作系统是分时间片执行任务的,当线程越多时,线程间的切换就越频繁,导致cpu性能消耗越多。
在这里插入图片描述
在这里插入图片描述

2)Reactive-Stream是什么?

Reactive-Streams 是一个标准规范,定义了异步数据流处理的 API 和行为规则,专注于解决异步流式数据的**背压(Backpressure)**问题。

主要特性

  • 异步:通过非阻塞方式处理数据流。
  • 流式处理:支持连续数据流的逐步消费,避免一次性加载大量数据。
  • 背压机制:允许消费者控制生产者的速率,防止消费者被超量数据淹没。
  • 非阻塞:在处理数据时不阻塞线程,提高资源利用率。

背压机制(Backpressure)
Reactive-Streams 的核心之一是通过 Subscription 提供背压支持。

消费者可以通过 request(n) 方法控制生产者的生产速率。
如果消费者处理能力不足,可以减少请求数据量,避免内存溢出或阻塞。

使用场景
事件流处理:如消息队列、用户事件。
高性能网络请求:如 RESTful API、WebSocket。
大数据流处理:需要逐步消费大规模数据的场景。
异步系统集成:将不同系统间的数据流通过异步方式连接起来。

在这里插入图片描述

3)核心接口

  • Publisher:发布者;产生数据流
  • Subscriber:订阅者; 消费数据流
  • Subscription:订阅关系;
    订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅。
  • Processor:处理器;
    处理器是同时实现了发布者和订阅者接口的组件。它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节,代表一个处理阶段,允许你在数据流中进行转换、过滤和其他操作。

【扩展】
以前的编程模型是命令式编程: 过程编程,全自定义
流式编程是响应式|声明式编程,说清楚要干什么,最终结果要怎么样

public class MyFlowDemo {public static void main(String[] args) {// 1、定义一个发布者,发布数据SubmissionPublisher<String> publisher = new SubmissionPublisher<>();//3、定义一个订阅者; 订阅者感兴趣发布者的数据;Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {private Flow.Subscription subscription;@Override //在订阅时  onXxxx:在xxx事件发生时,执行这个回调public void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread() + "订阅开始了:" + subscription);this.subscription = subscription;//从上游请求一个数据subscription.request(1);}@Override //在下一个元素到达时; 执行这个回调;   接受到新数据public void onNext(String item) {System.out.println(Thread.currentThread() + "订阅者,接受到数据:" + item);//从上游请求一个数据subscription.request(1);}@Override //在错误发生时,public void onError(Throwable throwable) {System.out.println(Thread.currentThread() + "订阅者,接受到错误信号:" + throwable);}@Override //在完成时public void onComplete() {System.out.println(Thread.currentThread() + "订阅者,接受到完成信号:");}};publisher.subscribe(subscriber);for (int i = 0; i < 10; i++) {publisher.submit("p-" + i);}try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}

在这里插入图片描述

4)处理器 Processor

在这里插入图片描述

public class FlowDemo {//定义流中间操作处理器; 只用写订阅者的接口static class MyProcessor extends SubmissionPublisher<String>  implements Flow.Processor<String,String> {private Flow.Subscription subscription; //保存绑定关系@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("processor订阅绑定完成");this.subscription = subscription;subscription.request(1); //找上游要一个数据}@Override //数据到达,触发这个回调public void onNext(String item) {System.out.println("processor拿到数据:"+item);//再加工item += ":哈哈";submit(item);//把我加工后的数据发出去subscription.request(1); //再要新数据}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}}/*** 1、Publisher:发布者* 2、Subscriber:订阅者* 3、Subscription: 订阅关系* 4、Processor: 处理器* @param args*///发布订阅模型:观察者模式,public static void main(String[] args) throws InterruptedException {//1、定义一个发布者; 发布数据;SubmissionPublisher<String> publisher = new SubmissionPublisher<>();//2、定一个中间操作:  给每个元素加个 哈哈 前缀MyProcessor myProcessor1 = new MyProcessor();//3、定义一个订阅者; 订阅者感兴趣发布者的数据;Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {private Flow.Subscription subscription;@Override //在订阅时  onXxxx:在xxx事件发生时,执行这个回调public void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread()+"订阅开始了:"+subscription);this.subscription = subscription;//从上游请求一个数据subscription.request(1);}@Override //在下一个元素到达时; 执行这个回调;   接受到新数据public void onNext(String item) {System.out.println(Thread.currentThread()+"订阅者,接受到数据:"+item);if(item.equals("p-7")){subscription.cancel(); //取消订阅}else {subscription.request(1);}}@Override //在错误发生时,public void onError(Throwable throwable) {System.out.println(Thread.currentThread()+"订阅者,接受到错误信号:"+throwable);}@Override //在完成时public void onComplete() {System.out.println(Thread.currentThread()+"订阅者,接受到完成信号:");}};//4、绑定发布者和订阅者publisher.subscribe(myProcessor1); //此时处理器相当于订阅者myProcessor1.subscribe(subscriber); //此时处理器相当于发布者//绑定操作;就是发布者,记住了所有订阅者都有谁,有数据后,给所有订阅者把数据推送过去。//        publisher.subscribe(subscriber);for (int i = 0; i < 10; i++) {//发布10条数据if(i == 5){
//                publisher.closeExceptionally(new RuntimeException("5555"));}else {publisher.submit("p-"+i);}//publisher发布的所有数据在它的buffer区;//中断
//            publisher.closeExceptionally();}//ReactiveStream//jvm底层对于整个发布订阅关系做好了 异步+缓存区处理 = 响应式系统;//发布者通道关闭publisher.close();//        publisher.subscribe(subscriber2);//发布者有数据,订阅者就会拿到Thread.sleep(20000);}
}

5)总结

在这里插入图片描述

二、Reactor核心

响应式编程:

1、底层:基于数据缓冲队列 + 消息驱动模型 + 异步回调机制
2、编码:流式编程 + 链式调用 + 声明式API
3、效果:优雅全异步 + 消息实时处理 + 高吞吐量 + 占用少量资源

回调机制:类似于SpringBoot的事件机制,在SpringBoot应用的启动过程中触发事件。

1、Reactor

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

1)介绍

Reactor 是一个用于在JVM构建非阻塞应用的响应式编程框架 !

2)Reactor的三个核心特性

3)响应式编程

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/482393.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker for Everyone Plus——No Enough Privilege

直接告诉我们flag在/flag中&#xff0c;访问第一小题&#xff1a; sudo -l查看允许提权执行的命令&#xff1a; 发现有image load命令 题目指明了有rz命令&#xff0c;可以用ZMODEM接收文件&#xff0c;看到一些write up说可以用XShell、MobaXterm、Tabby Terminal等软件连接上…

深度学习基础2

1.损失函数 1.1 线性回归损失函数 1.1.1 MAE损失 MAE&#xff08;Mean Absolute Error&#xff0c;平均绝对误差&#xff09;通常也被称为 L1-Loss&#xff0c;通过对预测值和真实值之间的绝对差取平均值来衡量他们之间的差异。。 公式&#xff1a; 其中&#xff1a; n 是样…

【Android】组件化嘻嘻嘻gradle耶耶耶

文章目录 Gradle基础总结&#xff1a;gradle-wrapper项目根目录下的 build.gradlesetting.gradle模块中的 build.gradlelocal.properties 和 gradle.properties 组件化&#xff1a;项目下新建一个Gradle文件定义一个ext扩展区域config.gradle全局基础配置&#xff08;使用在项目…

基础Web安全|SQL注入

基础Web安全 URI Uniform Resource Identifier&#xff0c;统一资源标识符&#xff0c;用来唯一的标识一个资源。 URL Uniform Resource Locator&#xff0c;统一资源定位器&#xff0c;一种具体的URI&#xff0c;可以标识一个资源&#xff0c;并且指明了如何定位这个资源…

【插入排序】:直接插入排序、二分插入排序、shell排序

【插入排序】&#xff1a;直接插入排序、二分插入排序、shell排序 1. 直接插入排序1.1 详细过程1.2 代码实现 2. 二分插入排序2.1 详细过程2.2 代码实现 3. shell排序3.1 详细过程3.2 代码实现 1. 直接插入排序 1.1 详细过程 1.2 代码实现 public static void swap(int[]arr,…

一万台服务器用saltstack还是ansible?

一万台服务器用saltstack还是ansible? 选择使用 SaltStack 还是 Ansible 来管理一万台服务器&#xff0c;取决于几个关键因素&#xff0c;如性能、扩展性、易用性、配置管理需求和团队的熟悉度。以下是两者的对比分析&#xff0c;帮助你做出决策&#xff1a; SaltStack&…

通讯专题4.1——CAN通信之计算机网络与现场总线

从通讯专题4开始&#xff0c;来学习CAN总线的内容。 为了更好的学习CAN&#xff0c;先从计算机网络与现场总线开始了解。 1 计算机网络体系的结构 在我们生活当中&#xff0c;有许多的网络&#xff0c;如交通网&#xff08;铁路、公路等&#xff09;、通信网&#xff08;电信、…

使用OSPF配置不同进程的中小型网络

要求&#xff1a; 给每个设备的接口配置好相应的地址 对进程1的各区域使用认证&#xff0c;认证为明文发送&#xff0c;明文保存 对骨干区域使用接口认证&#xff0c;非骨干区域使用区域认证 其他ospf进程均使用区域0 FW1上配置接口信任域和非信任域和服务器&#xff0c…

软考高项经验分享:我的备考之路与实战心得

软考&#xff0c;尤其是信息系统项目管理师&#xff08;高项&#xff09;考试&#xff0c;对于众多追求职业提升与专业认可的人士来说&#xff0c;是一场充满挑战与机遇的征程。我在当年参加软考高项的经历&#xff0c;可谓是一波三折&#xff0c;其中既有成功的喜悦&#xff0…

Transformers在计算机视觉领域中的应用【第1篇:ViT——Transformer杀入CV界之开山之作】

目录 1 模型结构2 模型的前向过程3 思考4 结论 论文&#xff1a; AN IMAGE IS WORTH 16X16 WORDS: TRANSFORMERS FOR IMAGE RECOGNITION AT SCALE 代码&#xff1a;https://github.com/google-research/vision_transformer Huggingface&#xff1a;https://github.com/huggingf…

Unity3D模型场景等测量长度和角度功能demo开发

最近项目用到多段连续测量物体长度和角度功能&#xff0c;自己研究了下。 1.其中向量角度计算&#xff1a; 需要传入三个坐标来进行计算。三个坐标确定两条向量线段的方向&#xff0c;从而来计算夹角。 public Vector3 SetAngle(Vector3 p1, Vector3 p2,Vector3 p3) { …

蓝桥杯第 23 场 小白入门赛

一、前言 好久没打蓝桥杯官网上的比赛了&#xff0c;回来感受一下&#xff0c;这难度区分度还是挺大的 二、题目总览 三、具体题目 3.1 1. 三体时间【算法赛】 思路 额...签到题 我的代码 // Problem: 1. 三体时间【算法赛】 // Contest: Lanqiao - 第 23 场 小白入门赛 …

从0开始学PHP面向对象内容之常用设计模式(享元)

二、结构型设计模式 7、享元模式&#xff08;Flyweight Pattern&#xff09; 这里是引用享元模式&#xff08;Flyweight Pattern&#xff09; 是一种结构型设计模式&#xff0c;旨在通过共享对象来减少内存使用&#xff0c;尤其适用于大量相似对象的场景。通过共享和重用对象的…

YOLO 标注工具 AutoLabel 支持 win mac linux

常见的标注工具&#xff0c;功能基础操作繁琐&#xff0c;无复制粘贴&#xff0c;标签无法排序修改&#xff0c;UI不美观&#xff0c;bug修正不及时&#xff0c;没有集成识别、训练、模型导出… 怎么办呢&#xff1f;AutoLabel它来了 Quick Start 一图胜千言 图像标注 支持YOL…

《Python基础》之Python中可以转换成json数据类型的数据

目录 一、JSON简介 JSON有两种基本结构 1、对象&#xff08;Object&#xff09; 2、数组&#xff08;Array&#xff09; 二、将数据装换成json数据类型方法 三、在Python中&#xff0c;以下数据类型可以直接转换为JSON数据类型 1、字典&#xff08;Dictionary&#xff09…

如何在Bash中等待多个子进程完成,并且当其中任何一个子进程以非零退出状态结束时,使主进程也返回一个非零的退出码?

文章目录 问题回答参考 问题 如何在 Bash 脚本中等待该脚本启动的多个子进程完成&#xff0c;并且当这其中任意一个子进程以非零退出码结束时&#xff0c;让该脚本也返回一个非零的退出码&#xff1f; 简单的脚本: #!/bin/bash for i in seq 0 9; docalculations $i & d…

远程桌面协助控制软件 RustDesk v1.3.3 多语言中文版

RustDesk 是一款开源的远程桌面软件&#xff0c;支持多平台操作&#xff0c;包括Windows、macOS、Linux、iOS、Android和Web。它提供端到端加密和基于角色的访问控制&#xff0c;确保安全性和隐私保护。使用简单&#xff0c;无需复杂配置&#xff0c;通过输入ID和密码即可快速连…

LWIP和FATFS 实现 FTP 服务端

目录 一、前言 二、LWIP 和 FTP 简介 1.LWIP 2.FTP 三、实现 FTP 服务端的主要步骤 1.初始化 LWIP 2.创建 FTP 服务器任务 3.处理客户端连接 4.实现 FTP 命令处理 5.文件系统操作 6.错误处理和日志记录 四、示例代码 1.创建FTP任务 2. FTP任务代码 3.处理交互数据…

多线程篇-8--线程安全(死锁,常用保障安全的方法,安全容器,原子类,Fork/Join框架等)

1、线程安全和不安全定义 &#xff08;1&#xff09;、线程安全 线程安全是指一个类或方法在被多个线程访问的情况下可以正确得到结果&#xff0c;不会出现数据不一致或其他错误行为。 线程安全的条件 1、原子性&#xff08;Atomicity&#xff09; 多个操作要么全部完成&a…

【css实现收货地址下边的平行四边形彩色线条】

废话不多说&#xff0c;直接上代码&#xff1a; <div class"address-block" ><!-- 其他内容... --><div class"checked-ar"></div> </div> .address-block{height:120px;position: relative;overflow: hidden;width: 500p…