🎼个人主页:【Y小夜】
😎作者简介:一位双非学校的大三学生,编程爱好者,
专注于基础和实战分享,欢迎私信咨询!
🎆入门专栏:🎇【MySQL,Java基础,Rust】
🎈热门专栏:🎊【Python,Javaweb,Springboot】
感谢您的点赞、关注、评论、收藏、是对我最大的认可和支持!❤️
目录
🎈假删(逻辑删除)
🎈启动端口总是被占用
🎉和其他启动程序端口冲突
🎉 Hyper-V虚拟机
🎈文件上传校验
🎈分库分表
🎈限流机制
🎉 常见的限流算法
🎊固定窗口限流算法
🎊滑动窗口限流算法
🎊漏桶算法
🎊令牌桶算法
🎈异步化以及线程池
🎉 异步化
🎉 线程池
🎊线程池的作用
🎊线程池的实现
🎊任务类型
🎈消息队列
🎉 模型
🎉 优势
🎊应用解耦的优势
🎈RabbitMQ入门
🎉 安装
🎉 配置RabbitMQ
🎉 常用端口
🎉 入门程序
🎊Hello World!
什么是频道channel?
核心方法
🎊Work Queues
核心代码
消息确认机制
🎊Publish/Subscribe
🎊Routing
🎊Topics
🎉 RabbitMQ的一些机制
🎊消息应答机制
🎊持久化
🎊死信队列
🎊延迟队列
🎈假删(逻辑删除)
在配置文件中进行配置
然后直接在需要进行假删的字段上加上@TableLogic,例如
@TableLogic //逻辑删除注解private Integer showStatus;
注:如果某一张表不想跟随全局逻辑删除的规则,也可在注解上自定义。执行时优先注解的规则:
/*** value不删除* delval 删除*/@TableLogic(value = "1",delval = "0") //逻辑删除注解private Integer showStatus;
🎈启动端口总是被占用
🎉和其他启动程序端口冲突
这种直接把另一个程序关闭就行,再次重新其中。
🎉 Hyper-V虚拟机
Hyper-V 是微软的一款虚拟机产品,允许在 Windows 上以虚拟机形式运行多个操作系统
- 方法一:直接把Hyper-v给禁用(但不建议,会影响一些windows功能)
- 方法二:查看被占用的端口,然后选择一个没被占用的端口启动项目。cmd命令打开输入
netsh interface ipv4 show excludedportrange protocol=tcp
🎈文件上传校验
只要涉及到用户自主上传的操作,一定要校验文件(图像) 校验的维度:
- 文件的大小
- 文件的后缀
- 文件的内容(成本要高一些)
- 文件的合规性(比如敏感内容,建议用第三方的审核功能) 扩展点:接入腾讯云的图片万象数据审核(COS 对象存储的审核功能)
🎈分库分表
分库(Database Sharding): 将数据按照某种规则,分散到多个独立的数据库中,每个数据库称为一个“分库”。
分表(Table Sharding): 将一个大表的数据按照某种规则,分散到多个小表中,每个小表称为一个“分片”、或“分表”。
总结一句话就是,分表指在数据量大的情况下,将表按照某个字段的值进行拆分和分散存储,例如拆分出前 1 万个用户一个表,后 1 万个用户一个表。 分库则是将不同的业务按照相关性进行划分,例如将用户中心用户相关的内容划分到一个库中,订单、支付信息和订单相关的划分到另一个库中,从而提高系统的可扩展性和稳定性。
为什么要分库分表?
在系统不断发展、数据量急剧增加的情况下,传统的数据库架构往往难以应对性能和扩展性的问题。特别是当单表的数据量达到千万、甚至亿级别时,即使使用了索引,查询性能也会受到影响。a
🎈限流机制
可以防止DoS攻击和限制Web爬虫:
- 控制成本 => 限制用户调用总次数
- 用户在短时间内疯狂使用,导致服务器资源被占满,其他用户无法使用 => 限流
🎉 常见的限流算法
🎊固定窗口限流算法
首先维护一个计数器,将单位时间段当做一个窗口,计数器记录这个窗口接收请求的次数。
- 当次数少于限流阀值,就允许访问,并且计数器+1
- 当次数大于限流阀值,就拒绝访问。
- 当前的时间窗口过去之后,计数器清零。
🎊滑动窗口限流算法
滑动窗口限流解决固定窗口临界值的问题。它将单位时间周期分为n个小周期,分别记录每个小周期内接口的访问次数,并且根据时间滑动删除过期的小周期。
🎊漏桶算法
漏桶算法面对限流,就更加的柔性,不存在直接的粗暴拒绝。
它的原理很简单,可以认为就是注水漏水的过程。往漏桶中以任意速率流入水,以固定的速率流出水。当水超过桶的容量时,会被溢出,也就是被丢弃。因为桶容量是不变的,保证了整体的速率。
🎊令牌桶算法
- 有一个令牌管理员,根据限流大小,定速往令牌桶里放令牌。
- 如果令牌数量满了,超过令牌桶容量的限制,那就丢弃。
- 系统在接受到一个用户请求时,都会先去令牌桶要一个令牌。如果拿到令牌,那么就处理这个请求的业务逻辑;
- 如果拿不到令牌,就直接拒绝这个请求。
Guava的RateLimiter限流组件,就是基于令牌桶算法实现的。
具体关于限流的相关知识细节,可以去看这篇大佬写的文章:4种经典限流算法讲解
🎈异步化以及线程池
🎉 异步化
异步化的一般流程:
1.当用户要进行耗时很长的操作时,点击提交后,不需要在界面空等,而是应该把这个任务保存到数据库中记录下来
2.用户要执行新任务时:
任务提交成功:
若程序存在空闲线程,可以立即执行此任务 若所有线程均繁忙,任务将入队列等待处理
任务提交失败:
比如所有线程都在忙碌且任务队列满了 选择拒绝此任务,不再执行
3.通过查阅数据库记录,发现提交失败的任务,并在程序空闲时将这些任务取出执行 程序(线程)
4.从任务队列中取出任务依次执行,每完成一项任务,就更新任务状态。
5.用户可以查询任务的执行状态,或者在任务执行成功或失败时接收通知(例如:发邮件、系统消息提示或短信),从而优化体验。
6.对于复杂且包含多个环节的任务,在每个小任务完成时,要在程序(数据库中))记录任务的执行状态(进度)。
🎉 线程池
🎊线程池的作用
帮助你轻松管理线程、协调任务的执行过程。 扩充:可以向线程池表达你的需求,比如最多只允许四个人同时执行任务。线程池就能自动为你进行管理。在任务紧急时,它会帮你将任务放入队列。而在任务不紧急或者还有线程空闲时,它会直接将任务交给空闲的线程,而不是放入队列。
🎊线程池的实现
在 Spring 中,我们可以利用 ThreadPoolTaskExecutor 配合 @Async 注解来实现线程池(不太建议)。
ps.虽然 Spring 框架提供了线程池的实现,但并不特别推荐使用。因为 Spring 毕竟是一个框架,它进行了一定程度的封装,可能隐藏了一些细节。更推荐大家直接使用 Java 并发包中的线程池,请注意,这并不是绝对不使用 Spring 的线程池,对其使用有一定的保留意见。
在 Java 中,可以使用JUC并发编程包中的 ThreadPoolExecutor,来实现非常灵活地自定义线程池。
参数解释:
-
corePoolSize:核心线程数,即线程池维护的最小线程数量(包括空闲线程)。当有新任务提交时,如果当前线程数少于corePoolSize,即使有空闲的核心线程,也会创建新的线程来处理任务。
-
maximumPoolSize:最大线程数,即线程池能够容纳的最大线程数量。当队列已满且当前线程数小于maximumPoolSize时,线程池会创建新的线程来处理任务。
-
keepAliveTime:非核心线程在终止前可以保持空闲状态的时间。当线程池中的线程数超过corePoolSize时,多余的线程在空闲时间达到keepAliveTime后会被销毁。
-
unit:keepAliveTime参数的时间单位。
-
workQueue:任务队列,用于存储等待执行的任务。当所有核心线程都在忙碌时,新任务会被放入队列中等待执行。
-
threadFactory:线程工厂,用于创建新线程。可以通过自定义threadFactory来改变线程的创建方式,比如设置线程的名称、优先级等。
-
handler:拒绝策略,当线程池和队列都满了时,无法继续接收新任务,此时需要采取一种策略来处理这个任务。handler就是指定了这种拒绝策略。
🎊任务类型
一般情况下,任务分为 IO 密集型和计算密集型两种。
计算密集型:吃 CPU,比如音视频处理、图像处理、数学计算等,一般是设置 corePoolSize 为 CPU 的核数 + 1(空余线程),可以让每个线程都能利用好 CPU 的每个核,而且线程之间不用频繁切换(减少打架、减少开销)
IO 密集型:吃带宽/内存/硬盘的读写资源,corePoolSize 可以设置大一点,一般经验值是 2n 左右,但是建议以 IO 的能力为主。
🎈消息队列
一般情况下,线程池用于多线程执行任务的情况,而消息队列用于应用解耦
🎉 模型
消息队列主要由四部分组成:消息生产者(Producer)、消息消费者(Consumer)、消息(Message)和消息队列(Queue)
消息队列的一个主要优点就是可以集中存储消息,使得消息的发送者和接收者无需同时在线,实现了发送者和接收者的解耦。这就是消息队列的核心作用,以及为什么我们需要使用消息队列的原因。
🎉 优势
异步处理:一旦生产者发送完消息,便可以立即转向其他任务,而消费者则可以在任何时候开始处理消息。这样一来,生产者和消费者之间就不会发生阻塞。
削峰填谷:消息队列允许我们先将用户请求存储起来,然后消费者(或说实际执行任务的应用)可以根据自身的处理能力和需求,逐步从队列中取出并处理请求。
虽然线程池也能实现削峰填谷的效果,但它并没有消息队列这样的存储灵活性,或者说,消息队列能实现的持久化存储:
- 1)数据持久化:它可以把消息集中存储到硬盘里,服务器重启就不会丢失
- 2) 可扩展性:可以根据需求,随时增加(或减少)节点,继续保持稳定的服务
- 3) 应用解耦:可以连接各个不同语言、框架开发的系统,让这些系统能够灵活传输读取数据
- 4) 发布订阅:如果一个非常大的系统要给其他子系统发送通知,最简单直接的方式是大系统直接依次调用小系统。
🎊应用解耦的优势
- 一个系统挂了,不影响另一个系统
- 系统挂了并恢复后,仍然可以取出消息,继续执行业务逻辑
- 只要发送消息到队列,就可以立刻返回,不用同步调用所有系统,性能更高
🎈RabbitMQ入门
首先,我们要介绍一个基本概念,也就是 RabbitMQ 中的 AMQP 协议。
那么,什么是 AMQP 呢?AMQP 的全称是 Advanced Message Queue Protocol,即高级消息队列协议。RabbitMQ 就是根据这个协议开发的。AMQP 是一个标准的协议,不仅 RabbitMQ,如果你想自己实现一个消息队列,也可以按照这个协议来设计。
AMQP 协议主要由几个部分组成,如下图所示,它非常适合我们来解释这个协议的各个组成部分。
🎉 安装
建议跟着官方文档进行安装: RabbitMQ: One broker to queue them all | RabbitMQ
安装RabbitMQ之前要先安装Erlang,下载的时候注意版本对应:Index of /download/,直接傻瓜式安装就行。然后安装rabbitmq,也是直接傻瓜式安装就行。然后去看一下rabbitmq服务是否开启
🎉 配置RabbitMQ
在开始菜单中找到RabbitMQ Command Promt,打开控制台
输入命令
rabbitmq-plugins enable rabbitmq_management
在启用插件成功后回看到一些提示信息,重启服务
然后进行访问localhost:15672这个客户端网址就行了,账号密码都是guest
提醒:
添加账号
由于guest这个用户,只能在本地访问,所以我们要新增一个用户admin,选择超级管理员权限
添加权限(使admin用户对虚拟主机“/” 具有所有权限):
最后就可以远程访问了。
🎉 常用端口
- 4369:epmd,RabbitMQ节点和CLI工具使用的对等发现服务
- 5672,5671:由AMQP 0-9-1和1.0客户端使用,不带TLS和TLS
- 25672:Erlang分发用于节点间和CLI工具通信,并从动态范围分配(默认情况下限制为单个端口,计算为AMQP端口+ 20000)。有关详细信息,请参阅网络指南。
- 15672:HTTP API客户端和rabbitmqadmin(仅当启用管理插件时)
- 61613,61414:没有和使用TLS的STOMP客户端(只有启用了STOMP插件)
- 1883,8883 :MQTT客户端没有和带有TLS,如果启用了MQTT插件
- 15674:STOMP-over-WebSockets客户端(只有启用了Web STOMP插件)
- 15675:MQTT-over-WebSockets客户端(仅当启用了Web MQTT插件时)
🎉 入门程序
这里建议看官网RabbitMQ Tutorials | RabbitMQ,我这里只是总结,官网上比较细致。
🎊Hello World!
先去下载依赖,你可以去 Maven repository 搜索 java 的客户端下载。
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.17.0</version>
</dependency>
生产者
package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;
// 定义一个名为SingleProducer的公开类,用于实现消息发送功能
public class SingleProducer {// 定义一个静态常量字符串QUEUE_NAME,它的值为"hello",表示我们要向名为"hello"的队列发送消息private final static String QUEUE_NAME = "hello";// 定义程序的入口点:一个公开的静态main方法,它抛出Exception异常public static void main(String[] argv) throws Exception {// 创建一个ConnectionFactory对象,这个对象可以用于创建到RabbitMQ服务器的连接ConnectionFactory factory = new ConnectionFactory();// 设置ConnectionFactory的主机名为"localhost",这表示我们将连接到本地运行的RabbitMQ服务器factory.setHost("localhost");// 如果你改了本地的用户名和密码,你可能要指定userName、userPassword,// 如果改了本地的端口,还要改Port。// 那我们这里不需要,我们这里就用默认的localhost,默认的用户名和密码,就是guest// factory.setUsername();// factory.setPassword();// factory.setPort();// 使用ConnectionFactory创建一个新的连接,这个连接用于和RabbitMQ服务器进行交互try (Connection connection = factory.newConnection();// 通过已建立的连接创建一个新的频道Channel channel = connection.createChannel()) {// 在通道上声明一个队列,我们在此指定的队列名为"hello"channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 创建要发送的消息,这里我们将要发送的消息内容设置为"Hello World!"String message = "Hello World!";// 使用channel.basicPublish方法将消息发布到指定的队列中。这里我们指定的队列名为"hello"channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));// 使用channel.basicPublish方法将消息发布到指定的队列中。这里我们指定的队列名为"hello"System.out.println(" [x] Sent '" + message + "'");}}
什么是频道channel?
你可以将频道看作客户端。你可能已经接触过其他类型的客户端,如 JDBC(用于连接数据库)和 Redis Client(用于操作缓存)。在这种情况下,你可以将频道看作是用于操作消息队列的客户端。
核心方法
- queueName:消息队列名称(注意,同名称的消息队列,只能用同样的参数创建一次)
- durabale:消息队列重启后,消息是否丢失
- exclusive:是否只允许当前这个创建消息队列的连接操作消息队列
- autoDelete:没有人用队列后,是否要删除队列
启动程序,可以看到
消费者
package com.ptu.bi.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;public class SingleConsumer {// 定义我们正在监听的队列名称private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 创建连接,创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置连接工厂的主机名,这里我们连接的是本地的RabbitMQ服务器factory.setHost("localhost");// 从工厂获取一个新的连接Connection connection = factory.newConnection();// 从连接中创建一个新的频道Channel channel = connection.createChannel();// 创建队列,在该频道上声明我们正在监听的队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 在控制台打印等待接收消息的信息System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 定义了如何处理消息,创建一个新的DeliverCallback来处理接收到的消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 将消息体转换为字符串String message = new String(delivery.getBody(), StandardCharsets.UTF_8);// 在控制台打印已接收消息的信息System.out.println(" [x] Received '" + message + "'");};// 在频道上开始消费队列中的消息,接收到的消息会传递给deliverCallback来处理,会持续阻塞channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}
这里我们创建队列,主要是为了确保该队列的存在,否则在后续的操作中可能会出现错误。主要是为了这点,即便你的队列原本并不存在,此语句也能够帮你创建一个新的队列。但是需要特别注意一点,如果你的队列已经存在,并且你想再次执行声明队列的操作,那么所有的参数必须与之前的设置完全一致。这是因为一旦一个队列已经被创建,就不能再创建一个与其参数不一致的同名队列。可以类比为,一旦你建好了一个快递站,就不能再在同一位置建立一个与之不同的快递站。
看到消息已经被消费
🎊Work Queues
小技巧:
核心代码
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
第二个参数叫 autoack,默认为 false —— 消息确认机制。
消息队列如何确保消费者已经成功取出消息呢?它依赖一个称为消息确认的机制。当消费者从队列中取走消息后,必须对此进行确认。这就像在收到快递后确认收货一样,这样消息队列才能知道消费者已经成功取走了消息,并能安心地停止传输。因此,整个过程就像这样。
消息确认机制
为了保证消息成功被消费(快递成功被取走),rabbitmq 提供了消息确认机制,当消费者接收到消息后,比如要给一个反馈:
- ack:消费成功
- nack:消费失败
- reject:拒绝
建议将 autoack 设置为 false,根据实际情况手动进行确认了。
接收请求
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
拒绝请求
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
第二个参数 'multiple' 表示批量确认,也就是说,是否需要一次性确认所有的历史消息,直到当前这条消息为止。
第 3 个参数表示是否重新入队,可用于重试。
🎊Publish/Subscribe
这种的是Fanout模式——RabbitMQ发布订阅模式
Fanout 这种类型非常简单,它是将接收到的所有消息广播到它知道的所有队列中。RabbitMQ 系统中默认有一个 fanout 类型的交换机。
🎊Routing
上述 Fanout 这种交换类型并不能给我们带来很大的灵活性,它只能进行无意识的广播,在这里我们将使用 direct 类型来替换,direct 类型的工作方式是:消息只去到它绑定的 routingKey 队列中去。
🎊Topics
尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性。比方说我们想接收的日志类型有 error 和 warning 两种,但某个队列只想 error 的消息,那这个时候 direct 交换机就办不到了。这就引入了 topic
类型。
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词。
在这个规则列表中,其中有两个替换符是大家需要注意的:
- 星号*可以代替一个单词
- 井号#可以替代零个或多个单词
此外,当队列绑定关系是下列情况时需要引起注意:
- 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout
- 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct
🎉 RabbitMQ的一些机制
🎊消息应答机制
为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答意思就是:消费者在接收消息并且处理完该消息之后,才告知 RabbitMQ 可以把该消息删除了。
RabbitMQ 中消息应答方式有两种:自动应答(默认)、手动应答
上面已经讲过了,这里不细讲了。
🎊持久化
前面我们通过手动应答处理了消息丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它会清空队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
队列持久化:
之前我们创建的队列都是非持久化的,RabbitMQ 如果重启,该队列就会被删除掉,如果要队列实现持久化就需要在声明队列的时候把 durable
参数设置为 true
需要注意的是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会错误:
这样就可以了
消息持久化:
要想让消息实现持久化需要在消息生产者修改代码,添加MessageProperties.PERSISTENT_TEXT_PLAIN
属性。
🎊死信队列
死信就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 中,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:
- 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
- 还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的原因:
- 消息 TTL 过期
- 队列达到最大长度(队列满了无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
🎊延迟队列
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。