Redis入门第二步:Redis数据类型详解

在这里插入图片描述

摘要:
欢迎继续跟随《Redis新手指南:从入门到精通》专栏的步伐!在本文中,我们将深入探讨Redis支持的各种数据类型,这些类型是Redis强大功能的核心。通过学习不同的数据类型,你将能够根据具体的应用需求选择最合适的数据结构,从而实现更高效的数据处理。

五种基础数据类型🐲

Redis是Key-Value缓存型数据库,Redis为了存储不同类型的数据库,提供了常用的五种数据类型

  • string(字符串)
  • hash(哈希散列)
  • list(列表)
  • set(集合)
  • zset(sorted set:有序集合)
    在这里插入图片描述
1 String🔅

String是Redis中最基本的数据类型,一个Key对应一个value. Redis中的String数据类型可以包含任何数据,如数字,字符串,jpg图片或者序列化对象

命令使用:

命令简述使用
GET获取存储在给定键中的值GET name
SET设置存储在给定键中的值SET name value
DEL删除存储在给定键中的值DEL name
INCR将键存储的值加1INCR key
DECR将键存储的值减1DECR key
INCRBY将键存储的值加上整数INCRBY key amout
DECRBY将键存储的值减去整数DECRBY key amout
[root@localhost ~]# redis-cli 
127.0.0.1:6379> set name www.baidu.com
OK
127.0.0.1:6379> get name
"www.baidu.com"
127.0.0.1:6379> set name 1
OK
127.0.0.1:6379> incr name
(integer) 2
127.0.0.1:6379> incrby name 99
(integer) 101
127.0.0.1:6379> decrby name 98 
(integer) 3
127.0.0.1:6379> get name
"3"
127.0.0.1:6379> exit

实战场景:

  • 缓存:经典使用场景,把常用信息,字符串,图片或者视频等信息放到redis中,redis作为缓存层,mysql做持久化层,降低MySQL的读写压力
  • 计数器:redis是单线程模型,一个命令执行完才会执行下一个,同时数据可以一步落地到其他的数据源
  • session:常见方案spring session + redis 实现session 共享
2 List🔅

Redis中的List其实就是链表(Redis用双链表实现List). 使用List结构,我们可以轻松的实现最新消息排队功能(比如微博的TimeLine).List的另一个应用就是消息队列,可以利用List的PUSH操作,将任务存放在List中,然后工作线程再用POP操作将任务取出进行执行

命令使用:

命令描述使用
RPUSH将值推入到List右端RPUSH key value
LPUSH将值推入到List左端LPUSH key value
RPOP从List的右端弹出一个值,并返回被弹回的值RPOP key
LPOP从List的左端弹出一个值,并返回被弹回的值LPOP key
LRANGE获取列表在给定范围上的所有值LRANGE key 0 -1
LINDEX通过索引获取List中的元素,负数下表表示从列表的最后从前开始索引LINDEX key index

使用列表的技巧:

  • lpush + lpop = Stack(栈)
  • lpush + rpop = Queue(队列)
  • lpush + ltrim = Capped Collection(有限集合)
  • lpush + brpop = Message Queue(消息队列)

命令执行:

127.0.0.1:6379> lpush key 1 2 11 ls mem
(integer) 5
127.0.0.1:6379> lrange key 0 -1
1) "mem"
2) "ls"
3) "11"
4) "2"
5) "1"
127.0.0.1:6379> lindex key -1
"1"
127.0.0.1:6379> lindex key 10
(nil)
127.0.0.1:6379>

实战场景:

  • 微博TimeLine:有人发布微博,用lpush加入时间轴,展示新的列表信息
  • 消息队列
3 Set🔅

Redis的 Set 是 String 类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据. Redis 中集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是 O(1)

命令使用:

命令描述使用
SADD向集合添加一个或多个成员SADD Key value
SCARD获取集合的成员数SCARD Key
SMEMBERS返回集合中的所有成员SMEMBERS Key member
SISMEMBER判断member元素是否是集合key的成员SISMEMBER Key member

命令执行:

127.0.0.1:6379> sadd myset hao hao1 xiaohao hao
(integer) 3
127.0.0.1:6379> smembers myset
1) "hao1"
2) "xiaohao"
3) "hao"
127.0.0.1:6379> sismember myset hao
(integer) 1
127.0.0.1:6379> 

实战场景:

  • 标签(tag),给用户添加标签,或者用户给消息添加标签,这样有同一标签或者类似标签的可以推荐关注的事或者关注的人,如"猜你喜欢"
  • 点赞,收藏等,可以放到Set中实现
4 Hash🔅

Redis hash 是一个String类型的field(字段)和value(值)的映射表,hash特别适合用于存储对象

命令描述使用
HSET添加键值对HSET hash-key sub-key1 value1
HGET获取指定散列键的值HGET hash-key key1
HGETALL获取散列中包含的所有键值对HGETALL hash-key
HDEL如果给定键存在于散列中,那么就移除这个键HDEL hash-key sub-key
127.0.0.1:6379[5]> hset user name1 hao
(integer) 1
127.0.0.1:6379[5]> hset user email1 hao@163.com
(integer) 1
127.0.0.1:6379[5]> HGETALL user
1) "name1"
2) "hao"
3) "email1"
4) "hao@163.com"
127.0.0.1:6379[5]> hget user user
(nil)
127.0.0.1:6379[5]> hget user user1
(nil)
127.0.0.1:6379[5]> hget user name1
"hao"
127.0.0.1:6379[5]> hset user name2 xiaohao
(integer) 1
127.0.0.1:6379[5]> hset user email2 xiaohao@163.com
(integer) 1
127.0.0.1:6379[5]> hgetall user
1) "name1"
2) "hao"
3) "email1"
4) "hao@163.com"
5) "name2"
6) "xiaohao"
7) "email2"
8) "xiaohao@163.com"
127.0.0.1:6379[5]>

实战场景

  • 缓存:能直观,相比string更节省空间,的维护缓存信息,如用户信息,视频信息等
5 Zset有序集合🔅

Redis 有序集合和集合一样也是 string 类型元素的集合,且不允许重复的成员。不同的是每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为集合中的成员进行从小到大的排序。

有序集合的成员是唯一的, 但分数(score)却可以重复。有序集合是通过两种数据结构实现:

  1. 压缩列表(ziplist): ziplist是为了提高存储效率而设计的一种特殊编码的双向链表。它可以存储字符串或者整数,存储整数时是采用整数的二进制而不是字符串形式存储。它能在O(1)的时间复杂度下完成list两端的push和pop操作。但是因为每次操作都需要重新分配ziplist的内存,所以实际复杂度和ziplist的内存使用量相关
  2. 跳跃表(zSkiplist): 跳跃表的性能可以保证在查找,删除,添加等操作的时候在对数期望时间内完成,这个性能是可以和平衡树来相比较的,而且在实现方面比平衡树要优雅,这是采用跳跃表的主要原因。跳跃表的复杂度是O(log(n))
命令描述使用
ZADD将一个带有给定分值的成员添加到有序集合里面ZADD zset-key 178 member1
ZRANGE根据元素在有序集合中所处的位置,从有序集合中获取多个元素ZRANGE zset-key 0-1
ZREM如果给定元素成员存在于有序集合中,那么就移除这个元素ZREM zset-key member1
ZSCORE获取有序集合(Sorted Set)中成员的分数(score)ZSCORE zset-key member1
127.0.0.1:6379> zadd myscoreset 100 hao 90 xiaohao
(integer) 2
127.0.0.1:6379> zrange myscoreset 0 -1
1) "xiaohao"
2) "hao"
127.0.0.1:6379> zscore myscoreset hao
"100"
127.0.0.1:6379> zscore myscoreset hao
"100"

实战场景

  • 排行榜:有序集合经典使用场景。例如小说视频等网站需要对用户上传的小说视频做排行榜,榜单可以按照用户关注数,更新时间,字数等打分,做排行。

三种特殊数据类型🐲

1.HyperLogLogs(基数统计)🔅

HyperLogLogs是Redis中的一种数据结构,用于估计集合中唯一元素的数量,而不需要存储实际的元素值. 举个例子,A = {1, 2, 3, 4, 5}, B = {3, 5, 6, 7, 9};那么基数(不重复的元素)= 1, 2, 4, 6, 7, 9;(允许容错,即可以接受一定误差)

HyperLogLogs基数统计结构可以非常省内存的去统计各种计数,比如注册IP数,每日访问IP数,页面实时UA,在线用户数,共同好友数等

通过一个例子来说明HyperLogLogs的优势:

一个大型的网站,每天 IP 比如有 100 万,粗算一个 IP 消耗 15 字节,那么 100 万个 IP 就是 15M。而 HyperLogLog 在 Redis 中每个键占用的内容都是 12K,理论存储近似接近 2^64 个值,不管存储的内容是什么,它一个基于基数估算的算法,只能比较准确的估算出基数,可以使用少量固定的内存去存储并识别集合中的唯一元素。而且这个估算的基数并不一定准确,是一个带有 0.81% 标准错误的近似值(对于可以接受一定容错的业务场景,比如IP数统计,UV 等,是可以忽略不计的)

命令描述使用
PFADD向HyperLogLogs数据结构添加一个或多个元素PFADD key element [element …]
PFCOUNT获取指定HyperLogLogs的基数估计值PFCOUNT key [key …]
PFMERGE将多个HyperLogLogs合并为一个新的HyperLogLogPFMERGE destkey sourcekey [sourcekey …]
127.0.0.1:6379> pfadd key1 a b c d e f g h i # 创建第一组元素
(integer) 1
127.0.0.1:6379> pfcount key1  # 统计元素的基数数量
(integer) 9
127.0.0.1:6379> pfadd key2 c j k l m e g a # 创建第二组元素
(integer) 1
127.0.0.1:6379> pfcount key2
(integer) 8
127.0.0.1:6379> pfmerge key3 key1 key2  # 合并两组:key1 key2 -> key3 并集
OK
127.0.0.1:6379> pfcount key3
(integer) 13
2.Bitmaps (位图) 🔅

Redis 中的 Bitmap(位存储)是一种数据结构,它可以用来存储位图,并对位图进行位操作。在 Redis 中,Bitmap 可以被用来存储二进制数据,并且可以高效地进行位操作。

Bitmap 的基本原理是将一个整数数组中的每个元素都看作一个位,可以存储 0 或 1。通过将一组位组合在一起,我们可以表示一个更大的数据结构,例如一个图像或一个文件。在 Redis 中,每个 Bitmap 都可以存储 1GB 的数据。

使用 Bitmap,我们可以对存储在其中的二进制数据进行位操作,例如按位与、按位或、按位异或等。此外,我们还可以使用 Bitmap 来检查一个值是否存在于一个集合中,或者计算两个集合的交集、并集等。

  • 用来解决什么问题?

比如:统计用户信息,活跃,不活跃!登录,未登录!打卡,不打卡!两个状态的,都可以使用 Bitmaps

如果存储一年的打卡状态需要多少内存呢?365 天 = 365 bit 1字节 = 8bit 46 个字节左右!

命令描述使用
BGET获取Bitmap中一个指定偏移量的值BGET key offset
BSET设置Bitmap中一个指定偏移量的值BSET key offset value
BINCRBY对Bitmap中指定偏移量的值进行自增操作BINCRBY key offset value
BZADD向Bitmap中添加一个或多个元素,并设置相应的分数BZADD key score member [member …]
BZCOUNT获取Bitmap中指定分数范围内的元素数量BZCOUNT key min max
BZMSUM获取Bitmap中所有元素的平均分,并添加到Bitmap的每个元素上BZMSUM key factor
BZREM从Bitmap中移除一个或多个元素BZREM key member [member …]
BZREMRANGEBYSCORE移除Bitmap中指定分数范围内的元素BZREMRANGEBYSCORE key min max
127.0.0.1:6379> setbit sign 0 1
(integer) 0
127.0.0.1:6379> setbit sign 1 1
(integer) 0
127.0.0.1:6379> setbit sign 2 0
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> getbit sign 3
(integer) 1
127.0.0.1:6379> bitcount sign
(integer) 3
127.0.0.1:6379>
3.geospatial (地理位置)🔅

依赖的Redis版本在redis3.2 及以上

Redis 的 Geospatial 是指 Redis 中的地理位置数据类型及其相关的命令和功能。Redis Geospatial 提供了对地理位置数据的存储、查询和分析的支持。

在 Redis Geospatial 中,可以使用以下命令来存储和操作地理位置数据:

  • GEOADD:用于将地理位置数据添加到 Redis 中。该命令接受一个键和多个经纬度坐标及相关的分数值。

例如:GEOADD mylocation 40.7128 -74.0060 "New York" 37.7749 -122.4194 "San Francisco"

  • GEODIST:用于计算两个地理位置之间的距离。该命令接受一个键、起始经纬度坐标和结束经纬度坐标,并返回它们之间的距离(以米为单位)。

例如:GEODIST mylocation 40.7128 -74.0060 37.7749 -122.4194 km

  • GEORADIUS:用于查询位于指定经纬度和半径范围内的地理位置。该命令接受一个键、中心经纬度坐标和半径,并返回在该范围内的地理位置及其分数值。

例如:GEORADIUS mylocation 40.7128 -74.0060 100 km

  • GEORADIUSBYMEMBER:用于查询位于指定成员和半径范围内的地理位置。该命令接受一个键、成员名称和半径,并返回在该范围内的地理位置及其分数值。

例如:GEORADIUSBYMEMBER mylocation "New York" 100 km

除了以上命令,Redis Geospatial 还提供了其他一些命令来处理地理位置数据,如 GEODEL(删除地理位置)、GEOINFO(获取地理位置信息)等。

需要注意的是,Redis Geospatial 的实现依赖于 Redis 的 Sorted Set 数据结构,因此在使用 Geospatial 功能时,需要了解 Sorted Set 的相关概念和操作方法

Stream 类型

1.为什么会设计Stream

Redis5.0 中还增加了一个数据结构 Stream,从字面上看是流类型,但其实从功能上看,应该是 Redis 对消息队列(MQ,Message Queue)的完善实现。

用过 Redis 做消息队列的都了解,基于 Reids的消息队列实现有很多种,例如:

  • PUB/SUB,订阅/发布模式

    • 但是发布订阅模式是无法持久化的,如果出现网络断开、Redis 宕机等,消息就会被丢弃;
  • 基于List LPUSH+BRPOP 或者 基于Sorted-Set的实现

    • 支持了持久化,但是不支持多播,分组消费等

为什么上面的结构无法满足广泛的消息队列场景? 这里便引出一个核心的问题:如果我们期望设计一种数据结构来实现消息队列,最重要的就是要理解设计一个消息队列需要考虑什么?初步的我们很容易想到

  • 消息的生产

  • 消息的消费

    • 单播和多播(多对多)

    • 阻塞和非阻塞读取

  • 消息有序性

  • 消息的持久化

其它还要考虑啥嗯?借助美团技术团队的一篇文章,消息队列设计图
在这里插入图片描述
我们不妨看看Redis考虑了哪些设计

  • 消息ID的序列化生成
  • 消息遍历
  • 消息的阻塞和非阻塞读取
  • 消息的分组消费
  • 未完成消息的处理
  • 消息队列监控

这也是我们需要理解Stream的点,但是结合上面的图,我们也应该理解Redis Stream也是一种超轻量MQ并没有完全实现消息队列所有设计要点,这决定着它适用的场景

2.Stream详解

经过梳理总结,我认为从以下几个大的方面去理解Stream是比较合适的,总结如下:

  • Stream的结构设计

  • 生产和消费

    • 基本的增删查改
    • 单一消费者的消费
    • 消费组的消费
  • 监控状态

3.Stream的结构

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建
在这里插入图片描述
上图解析:

  • Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer), 这些消费者之间是竞争关系。
  • last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
  • pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pending_ids变量在Redis官方被称之为PEL,也就是Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

此外我们还需要理解两点:

  • 消息ID: 消息ID的形式是timestampInMillis-sequence,例如1527846880572-5,它表示当前的消息在毫米时间戳1527846880572时产生,并且是该毫秒内产生的第5条消息。消息ID可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的ID要大于前面的消息ID。
  • 消息内容: 消息内容就是键值对,形如hash结构的键值对,这没什么特别之处
4.增删改查

消息队列相关命令:

XADD是Redis 6.2版本引入的一个新命令

命令详解
XADD添加消息到末尾
XTRIM对流进行修剪,限制长度
XDEL删除消息
XLEN获取流包含的元素数量,即消息长度
XRANGE获取消息列表,会自动过滤已经删除的消息
XREVRANGE反向获取消息列表,ID 从大到小
XREAD以阻塞或非阻塞方式获取消息列表
# *号表示服务器自动生成ID,后面顺序跟着一堆key/value
127.0.0.1:6379[2]> xadd codehole * name laoqian age 30 # 名字叫laoqian,年龄30岁
"1702088359499-0"
127.0.0.1:6379[2]> xadd codehole * name xiaoyu age 29
"1702088363852-0"
127.0.0.1:6379[2]> xadd codehole * name xiaoqian age 1
"1702088368106-0"
127.0.0.1:6379[2]> xlen codehole
(integer) 3
127.0.0.1:6379[2]> xrange codehole - +  # -表示最小值, +表示最大值
1) 1) "1702088359499-0"2) 1) "name"2) "laoqian"3) "age"4) "30"
2) 1) "1702088363852-0"2) 1) "name"2) "xiaoyu"3) "age"4) "29"
3) 1) "1702088368106-0"2) 1) "name"2) "xiaoqian"3) "age"4) "1"
127.0.0.1:6379[2]> xrange codehole 1702088368106-0 + # 指定最小消息ID的列表
1) 1) "1702088368106-0"2) 1) "name"2) "xiaoqian"3) "age"4) "1"
127.0.0.1:6379[2]> xdel codehole 1702088359499-0 # 删除指定消息ID
(integer) 1
127.0.0.1:6379[2]> xlen codehole  # 查看整个Stream的列表长度
(integer) 2
127.0.0.1:6379[2]> del codehole # 删除整个Stream 
(integer) 1
127.0.0.1:6379[2]> xlen codehole 
(integer) 0
127.0.0.1:6379[2]>
5.独立消费

我们可以在不定义消费组的情况下进行Stream消息的独立消费,当Stream没有新消息时,甚至可以阻塞等待。Redis设计了一个单独的消费指令xread,可以将Stream当成普通的消息队列(list)来使用。使用xread时,我们可以完全忽略消费组(Consumer Group)的存在,就好比Stream就是一个普通的列表(list)

127.0.0.1:6379[2]> xadd codehole * name laoqian age 30 # 名字叫laoqian,年龄30岁
"1702088359499-0"
127.0.0.1:6379[2]> xadd codehole * name xiaoyu age 29
"1702088363852-0"
127.0.0.1:6379[2]> xadd codehole * name xiaoqian age 1
"1702088368106-0"3 
127.0.0.1:6379[2]> xread count 2 streams codehole 0-0  # 从Stream头部读取两条消息
1) 1) "codehole"2) 1) 1) "1702088732976-0"2) 1) "name"2) "laoqian"3) "age"4) "30"2) 1) "1702088736610-0"2) 1) "name"2) "xiaoyu"3) "age"4) "29"
127.0.0.1:6379[2]> xread count 1 streams codehole $ # 从Stream尾部读取一条消息,毫无疑问,这里不会返回任何消息
(nil)
127.0.0.1:6379[2]> xread block 0 count 1 streams codehole $ # 从尾部阻塞等待新消息到来,下面的指令会堵住,直到新消息到来
1) 1) "codehole"2) 1) 1) "1702088785950-0"2) 1) "name"2) "youming"3) "age"4) "60"
(26.60s)
--------------------------------------------------------------------------------------------
# 我们从新打开一个窗口,在这个窗口往Stream里塞消息
# 再切换到前面的窗口,我们可以看到阻塞解除了,返回了新的消息内容
# 而且还显示了一个等待时间,这里我们等待了26.60s
127.0.0.1:6379[2]> xadd codehole * name youming age 60  
"1702088785950-0"

客户端如果想要使用xread进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息ID。下次继续调用xread时,将上次返回的最后一个消息ID作为参数传递进去,就可以继续消费后续的消息。

block 0表示永远阻塞,直到消息到来,block 1000表示阻塞1s,如果1s内没有任何消息到来,就返回nil

127.0.0.1:6379[2]> xread block 1000 count 1 streams codehole $
(nil)
(1.01s)
6.消费组消费

消费组消费图
在这里插入图片描述

命令详解
XGROUP CREATE创建消费者组
XREADGROUP GROUP读取消费者组中的消息
XACK -将消息标记为"已处理"
XGROUP SETID为消费者组设置新的最后递送消息ID
XGROUP DELCONSUMER删除消费者
XGROUP DESTROY删除消费者组
XPENDING显示待处理消息的相关信息
XCLAIM转移消息的归属权
XINFO查看流和消费者组的相关信息
XINFO GROUPS打印消费者组的信息
XINFO STREAM打印流信息

创建消费组

Stream通过xgroup create指令创建消费组(Consumer Group),需要传递起始消息ID参数用来初始化last_delivered_id变量

127.0.0.1:6379[2]> xgroup create codehole cg1 0-0 #  表示从头开始消费
OK
127.0.0.1:6379[2]> xgroup create codehole cg2 $ # $表示从尾部开始消费,只接受新消息,当前Stream消息会全部忽略
OK
127.0.0.1:6379[2]> xinfo stream codehole  # 获取Stream信息1) "length"2) (integer) 5  # 共5个消息3) "radix-tree-keys"4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "last-generated-id"8) "1702088815554-0"9) "groups"
10) (integer) 2 # 两个消费组
11) "first-entry"  # 第一个消息
12) 1) "1702088732976-0"2) 1) "name"2) "laoqian"3) "age"4) "30"
13) "last-entry" # 最后一个消息
14) 1) "1702088815554-0"2) 1) "name"2) "leij"3) "age"4) "21"
127.0.0.1:6379[2]>  xinfo groups codehole # 获取Stream的消费组信息
1) 1) "name"2) "cg1"3) "consumers"4) (integer) 0 # 该消费组还没有消费者5) "pending"6) (integer) 0 # 该消费组没有正在处理的消息7) "last-delivered-id"8) "0-0"
2) 1) "name"2) "cg2"3) "consumers"4) (integer) 05) "pending"6) (integer) 07) "last-delivered-id"8) "1702088815554-0"

消费组消费

Stream提供了xreadgroup指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息ID。它同xread一样,也可以阻塞等待新消息。读到新消息后,对应的消息ID就会进入消费者的PEL(正在处理的消息)结构里,客户端处理完毕后使用xack指令通知服务器,本条消息已经处理完毕,该消息ID就会从PEL中移除

# >号表示从当前消费组的last_delivered_id后面开始读
# 每当消费者读取一条消息,last_delivered_id变量就会前进
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"2) 1) 1) 1527851486781-02) 1) "name"2) "laoqian"3) "age"4) "30"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"2) 1) 1) 1527851493405-02) 1) "name"2) "yurui"3) "age"4) "29"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole >
1) 1) "codehole"2) 1) 1) 1527851498956-02) 1) "name"2) "xiaoqian"3) "age"4) "1"2) 1) 1527852774092-02) 1) "name"2) "youming"3) "age"4) "60"
# 再继续读取,就没有新消息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
(nil)
# 那就阻塞等待吧
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
# 开启另一个窗口,往里塞消息
127.0.0.1:6379> xadd codehole * name lanying age 61
1527854062442-0
# 回到前一个窗口,发现阻塞解除,收到新消息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
1) 1) "codehole"2) 1) 1) 1527854062442-02) 1) "name"2) "lanying"3) "age"4) "61"
(36.54s)
127.0.0.1:6379> xinfo groups codehole  # 观察消费组信息
1) 1) name2) "cg1"3) consumers4) (integer) 1  # 一个消费者5) pending6) (integer) 5  # 共5条正在处理的信息还有没有ack
2) 1) name2) "cg2"3) consumers4) (integer) 0  # 消费组cg2没有任何变化,因为前面我们一直在操纵cg15) pending6) (integer) 0
# 如果同一个消费组有多个消费者,我们可以通过xinfo consumers指令观察每个消费者的状态
127.0.0.1:6379> xinfo consumers codehole cg1  # 目前还有1个消费者
1) 1) name2) "c1"3) pending4) (integer) 5  # 共5条待处理消息5) idle6) (integer) 418715  # 空闲了多长时间ms没有读取消息了
# 接下来我们ack一条消息
127.0.0.1:6379> xack codehole cg1 1527851486781-0
(integer) 1
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name2) "c1"3) pending4) (integer) 4  # 变成了5条5) idle6) (integer) 668504
# 下面ack所有消息
127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0
(integer) 4
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name2) "c1"3) pending4) (integer) 0  # pel空了5) idle6) (integer) 745505
7.信息监控

Stream 提供了XINFO来实现对服务器信息的监控,可以查询:

  1. 查看队列信息

    127.0.0.1:6379> Xinfo stream mq1) "length"2) (integer) 73) "radix-tree-keys"4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "groups"8) (integer) 19) "last-generated-id"
    10) "1553585533795-9"
    11) "first-entry"
    12) 1) "1553585533795-3"2) 1) "msg"2) "4"
    13) "last-entry"
    14) 1) "1553585533795-9"2) 1) "msg"2) "10"
    
  2. 消费组信息

    127.0.0.1:6379> Xinfo groups mq
    1) 1) "name"2) "mqGroup"3) "consumers"4) (integer) 35) "pending"6) (integer) 37) "last-delivered-id"8) "1553585533795-4"
    
  3. 消费者组成员信息

    127.0.0.1:6379> XINFO CONSUMERS mq mqGroup
    1) 1) "name"2) "consumerA"3) "pending"4) (integer) 15) "idle"6) (integer) 18949894
    2) 1) "name"2) "consumerB"3) "pending"4) (integer) 15) "idle"6) (integer) 3092719
    3) 1) "name"2) "consumerC"3) "pending"4) (integer) 15) "idle"6) (integer) 23683256
    
8.Stream用在什么样场景

1.可用作时通信等,大数据分析,异地数据备份等
在这里插入图片描述
2.客户端可以平滑扩展,提高处理能力
在这里插入图片描述
消息ID的设计是否考虑了时间回拨的问题?

在分布式算法 - ID算法设计中, 一个常见的问题就是时间回拨问题,那么Redis的消息ID设计中是否考虑到这个问题呢?

XADD生成的1553439850328-0,就是Redis生成的消息ID,由两部分组成:时间戳-序号。时间戳是毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型(int64)。序号是在这个毫秒时间点内的消息序号,它也是个64位整型。

可以通过multi批处理,来验证序号的递增:

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> XADD memberMessage * msg one
QUEUED
127.0.0.1:6379> XADD memberMessage * msg two
QUEUED
127.0.0.1:6379> XADD memberMessage * msg three
QUEUED
127.0.0.1:6379> XADD memberMessage * msg four
QUEUED
127.0.0.1:6379> XADD memberMessage * msg five
QUEUED
127.0.0.1:6379> EXEC
1) "1553441006884-0"
2) "1553441006884-1"
3) "1553441006884-2"
4) "1553441006884-3"
5) "1553441006884-4"

由于一个redis命令的执行很快,所以可以看到在同一时间戳内,是通过序号递增来表示消息的。

为了保证消息是有序的,因此 Redis 生成的 ID 是单调递增有序的。由于 ID 中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis 的每个 Stream 类型数据都维护一个 latest_generated_id属性,用于记录最后一个消息的ID。若发现当前时间戳退后(小于latest_generated_id所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质。

强烈建议使用Redis的方案生成消息ID,因为这种时间戳+序号的单调递增的ID方案,几乎可以满足你全部的需求。但同时,记住ID是支持自定义的,别忘了!

消费者崩溃带来的会不会消息丢失问题?

为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。演示如下:

127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情况
1) (integer) 5 # 5个已读取但未处理的消息
2) "1553585533795-0" # 起始ID
3) "1553585533795-4" # 结束ID
4) 1) 1) "consumerA" # 消费者A有3个2) "3"2) 1) "consumerB" # 消费者B有1个2) "1"3) 1) "consumerC" # 消费者C有1个2) "1"127.0.0.1:6379> XPENDING mq mqGroup - + 10 # 使用 start end count 选项可以获取详细信息
1) 1) "1553585533795-0" # 消息ID2) "consumerA" # 消费者3) (integer) 1654355 # 从读取到现在经历了1654355ms,IDLE4) (integer) 5 # 消息被读取了5次,delivery counter
2) 1) "1553585533795-1"2) "consumerA"3) (integer) 16543554) (integer) 4
# 共5个,余下3个省略 ...127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # 在加上消费者参数,获取具体某个消费者的Pending列表
1) 1) "1553585533795-0"2) "consumerA"3) (integer) 16410834) (integer) 5
# 共3个,余下2个省略 ...

每个Pending的消息有4个属性:

  • 消息ID
  • 所属消费者
  • IDLE,已读取时长
  • delivery counter,消息被读取次数

上面的结果我们可以看到,我们之前读取的消息,都被记录在Pending列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令 XACK 完成告知消息处理完成,演示如下:

127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息处理结束,用消息ID标识
(integer) 1127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
1) (integer) 4 # 已读取但未处理的消息已经变为4个
2) "1553585533795-1"
3) "1553585533795-4"
4) 1) 1) "consumerA" # 消费者A,还有2个消息处理2) "2"2) 1) "consumerB"2) "1"3) 1) "consumerC"2) "1"
127.0.0.1:6379>

有了这样一个Pending机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该Pending列表,就可以继续处理该消息了,保证消息的有序和不丢失

消费者彻底宕机后如何转移给其它消费者处理?

还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者Pending的消息,转义给其他的消费者处理,就是消息转移。

消息转移的操作时将某个消息转移到自己的Pending列表中。使用语法XCLAIM来实现,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:

# 当前属于消费者A的消息1553585533795-1,已经15907,787ms未处理了
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"2) "consumerA"3) (integer) 159077874) (integer) 4# 转移超过3600s的消息1553585533795-1到消费者B的Pending列表
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
1) 1) "1553585533795-1"2) 1) "msg"2) "2"# 消息1553585533795-1已经转移到消费者B的Pending中。
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"2) "consumerB"3) (integer) 84404 # 注意IDLE,被重置了4) (integer) 5 # 注意,读取次数也累加了1次

以上代码,完成了一次消息转移。转移除了要指定ID外,还需要指定IDLE,保证是长时间未处理的才被转移。被转移的消息的IDLE会被重置,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了IDLE,则可以避免后面的转移不会成功,因为IDLE不满足条件。例如下面的连续两条转移,第二条不会成功。

127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1

这就是消息转移。至此我们使用了一个 Pending 消息的 ID,所属消费者和IDLE 的属性,还有一个属性就是消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。这个属性主要用在判定是否为错误数据上

坏消息问题,Dead Letter,死信问题

正如上面所说,如果某个消息,不能被消费者处理,也就是不能被XACK,这是要长时间处于Pending列表中,即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter就会累加(上一节的例子可以看到),当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用XDEL语法,演示如下:

# 删除队列中的消息
127.0.0.1:6379> XDEL mq 1553585533795-1
(integer) 1
# 查看队列中再无此消息
127.0.0.1:6379> XRANGE mq - +
1) 1) "1553585533795-0"2) 1) "msg"2) "1"
2) 1) "1553585533795-2"2) 1) "msg"2) "3"

注意本例中,并没有删除 Pending 中的消息因此你查看Pending,消息还会在。可以执行 XACK 标识其处理完毕!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/436099.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Spring基础3】- Spring的入门程序

目录 3-1 Spring的下载3-2 Spring的 jar 包3-3 第一个 Spring程序第一步:添加spring context的依赖,pom.xml配置如下第二步:添加junit依赖第三步:定义bean:User第四步:编写spring的配置文件:bea…

技术成神之路:设计模式(十八)适配器模式

介绍 适配器模式(Adapter Pattern)是一种结构型设计模式,它允许接口不兼容的类可以协同工作,通过将一个类的接口转换成客户端所期望的另一个接口,使得原本由于接口不兼容而不能一起工作的类可以一起工作。 1.定义 适配…

python编程开发“人机猜拳”游戏

👨‍💻个人主页:开发者-曼亿点 👨‍💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍💻 本文由 曼亿点 原创 👨‍💻 收录于专栏&#xff1a…

计算机毕业设计 基于深度学习的短视频内容理解与推荐系统的设计与实现 Python+Django+Vue 前后端分离 附源码 讲解 文档

🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点…

【架构】前台、中台、后台

文章目录 前台、中台、后台1. 前台(Frontend)特点:技术栈: 2. 中台(Middleware)特点:技术栈: 3. 后台(Backend)特点:技术栈: 示例场景…

万界星空科技铜拉丝行业MES系统,实现智能化转型

一、铜拉丝行业生产管理的难点主要体现在以下几个方面: 1、标准严格:铜线产品对质量的要求极高,特别是在电气性能、导电性、耐腐蚀性等方面,任何微小的瑕疵都可能影响产品的使用效果和安全性。 2、过程监控:生产过程…

极速 JavaScript 打包器:esbuild

文章目录 前言什么是esbuild?esbuild如何实现如此出色的性能?基本配置入口文件输出文件模块格式targetplatformexternalbanner和footer 结论 前言 esbuild是一个快速、可扩展的JavaScript打包器和压缩器,它的目标是成为最快的打包器。它使用…

【C++篇】启航——初识C++(下篇)

接上篇【C篇】启航——初识C(上篇) 目录 一、引用 1.引用的概念 2.引用的基本语法 3.引用的特点 3.1 别名 3.2 不占用额外内存 3.3 必须初始化 3.4 不能为 NULL 4.引用的使用 4.1 函数参数传递 4.2 返回值 4.3 常量引用 5.引用和指针的关…

Spring Task 2024/9/30

Spring Task是Spring框架提供的任务调度工具,可以按照约定时间自动执行某个代码逻辑。 作用:定时自动执行某段java代码。 cron表达式 在线Cron表达式生成器 (qqe2.com)👈在线生成网站 入门案例 SkyApplication 启动类 package com.sky;im…

盛事启幕 | 第三届OpenHarmony技术大会重磅官宣,邀您共绘智联未来

未来已来,科技何向? ——10月12日-13日众多大咖齐聚上海 聚焦OpenHarmony生态前沿 与您一同解码技术的下一片蓝海

C# 委托(Delegate)一

一.Delegate的定义说明: C# 中的委托(Delegate)就是类似于 C 或 C 中函数的指针。Delegate 是存有对某个方法引用的一种引用类型变量,引用可在运行时是可以被改变的,特别适用于实现事件和回调方法。所有的Delegate都是…

网络基础概念和 socket 编程

网络基础概念和 socket 编程 学习目标: 了解 OSI 七层模型、TCP/IP 四层模型结构了解常见的网络协议格式掌握网络字节序和主机字节序之间的转换理解 TCP 服务器端通信流程理解 TCP 客户端通信流程实现 TCP 服务器端和客户端的代码 推荐一个非常好的学习资料仓库 协…

简单线性回归分析-基于R语言

本题中&#xff0c;在不含截距的简单线性回归中&#xff0c;用零假设对统计量进行假设检验。首先&#xff0c;我们使用下面方法生成预测变量x和响应变量y。 set.seed(1) x <- rnorm(100) y <- 2*xrnorm(100) &#xff08;a&#xff09;不含截距的线性回归模型构建。 &…

计算机网络(九) —— Tcp协议详解

目录 一&#xff0c;关于Tcp协议 二&#xff0c;Tcp报头字段解析 2.0 协议字段图示 2.1 两个老问题 2.2 16位窗口大小 2.3 32位序号和确认序号 2.4 6个标记位 三&#xff0c;Tcp保证可靠性策略 3.1 确认应答机制&#xff08;核心&#xff09; 3.2 超时重传机制 3.3 …

基于开源WQ装备知识图谱的智能问答优化2

基于笔者之前写的博客基础上&#xff1a;https://blog.csdn.net/zhanghan11366/article/details/142139488【基于开源WQ装备知识图谱的智能问答全流程构建】进行优化。新增处理基于特定格式下的WQ文档&#xff0c;抽取文档的WQ属性和关系&#xff0c;并抽取对应WQt图片存储至mi…

位运算(3)_判定字符是否唯一_面试题

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 位运算(3)_判定字符是否唯一_面试题 收录于专栏【经典算法练习】 本专栏旨在分享学习算法的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目…

c++11~c++20 结构化绑定

结构化帮绑定可以作用于3中类型 一、原生数组类型 结果&#xff1a; 备注&#xff1a;绑定到原生数组所需条件仅仅是要求别名的数量于数组元素的个数一致&#xff0c;这里的x&#xff0c;y&#xff0c;z分别绑定到a[0],a[1],a[2] 二、绑定到结构体和类对象 结果&#xff1a;…

selenium测试框架快速搭建详解

一、介绍 Selenium目前主流的web自动化测试框架&#xff1b;支持多种编程语言Java、pythan、go、js等&#xff1b;selenium 提供一系列的api 供我们使用&#xff0c;因此在web测试时我们要点页面中的某一个按钮&#xff0c;那么我们只需要获取页面&#xff0c;然后根据id或者n…

JQuery基本介绍和使用方法

JQuery基本介绍和使用方法 W3C 标准给我们提供了⼀系列的函数, 让我们可以操作: ⽹⻚内容⽹⻚结构⽹⻚样式 但是原⽣的JavaScript提供的API操作DOM元素时, 代码⽐较繁琐, 冗⻓. 我们可以使⽤JQuery来操作⻚⾯对象. jQuery是⼀个快速、简洁且功能丰富的JavaScript框架, 于20…

uniapp数据缓存

利用uniapp做开发时&#xff0c;缓存数据是及其重要的&#xff0c;下面是同步缓存和异步缓存的使用 同步缓存 在执行同步缓存时会阻塞其他代码的执行 ① uni.setStorageSync(key, data) 设置缓存&#xff0c;如&#xff1a; uni.setStorageSync(name, 张三) ② uni.getSt…