当前位置:

【Redis笔记】Redis消息队列方案

访客 2024-04-24 1071 0

Reids消息队列(Message Queue)

消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流,并基于数据通信来进行分布式系统的集成。
消息队列具有 低耦合、可靠投递、广播、流量控制、最终一致性 等功能。
常见的消息队列 有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等。
通过提供 消息传递 和 消息排队 模型,它可以在 分布式环境 下提供 应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步 等功能。

基于List结构的模拟消息队列

Redis的list数据结构是一个双向链表,使用出队入队即可实现消息队列。

LPUSH 、RPOP 或 RPUSH 、 LPOP

LPUSH:将一个或多个值value插入到列表key的表头如果有多个value值,那么各个value值按从左到右的顺序依次插入到表头。
RPOP:移除并返回列表key的尾元素。
RPUSH与LPOP同理。
通过 LPUSH,RPOP 这样的方式,会存在一个性能风险点,就是消费者如果想要及时的处理数据,就要在程序中写个类似 while(true) 这样的逻辑,不停地去调用 RPOP 或 LPOP 命令,这就会给消费者程序带来些不必要的性能损失。

LPUSH、BRPOP 或 RPUSH、BLPOP

Redis 还提供了 BLPOP、BRPOP 这种阻塞式读取的命令(带 B-Bloking的都是阻塞式),客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。这种方式就节省了不必要的 CPU 开销。

数据存入格式:lpush listname v1 v2 v3

v 表示存入链表的值

阻塞等待指令格式:blpop list_name timeout

listname 为 取出内容的列表名
timeout为等待超时时间,如果为0,则可无限等待

优点

  • 利用Redis存储,不受限于JVM内存
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点

  • 无法避免消息丢失:从redis中取出消息后,如果尚未处理完出现异常,取出的消息就丢失了
  • 只支持单消费者,一条消息被取走后,其他消费者无法再获取。

基于PubSub的消息队列

"发布/订阅"模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel),而发布者可以向指定的频道(channel)发送消息,所有订阅此频道的订阅者都会收到此消息。

常用的指令:
subscribe channel1 [channel...]: 订阅一个或多个频道
publish channel1 msg:向一个频道发送消息
psubscribe pattern [pattern]:订阅与pattern格式匹配的所有频道

pattern通配符:
h?llo:?可以替换为任意一个其他字母,比如hello;而hllo和hkkllo不行
hllo: 可以替换为0个或多个其他字母,比如hllo、hello、heeeello
h[ae]llo:可以替换为任意一个中括号中的字母,比如hallo、hello;而hillo不行

优点

  • 采用发布订阅模型,支持多生产、多消费

缺点

  • 不支持数据持久化;如果出现网络断开、Redis 宕机等,消息就会被丢弃。假设一个消费者都没有,那消息就直接被丢弃了。
  • 无法避免消息丢失。
  • 消息堆积有上限、超出时数据丢失。

基于Stream的消息队列——单消费方式

Redis 5.0 版本新增了一个更强大的数据结构——Stream。它提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

存入消息的方式之一_XADD

命令格式:XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]

NOMKSTREAM:如果队列不存在,是否自动创建队列,默认自动创建
<MAXLEN | MINID> [= | ~] threshold [LIMIT count] :设置消息队列的最大消息数量
<* | id> 消息的唯一id,* 代表由redis自动生成。* 格式是"时间戳-递增数字",例如"1644804662707-0"
field value [field value …]:发送到队列中的消息队列,称为Entry。格式就是多个key-value键值对。

示例:

# 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成idXADD users * name jack age 21

读取消息的方式之一——XREAD

命令格式:XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

[COUNT count]:每次读取消息的最大数量
[BLOCK milliseconds]:当没有消息时,是否阻塞、阻塞时长
STREAMS key [key …]:要从哪个队列读取消息,key就是队列名
起始id,只返回大于该id的消息;0代表从第一个消息开始;$代表从最新的消息开始

读取最新的消息示例:

XREAD COUNT 1 BLOCK 1000 STREAMS users $

优点

  • 消息可回溯
  • 一个消息可被多个消费者读取
  • 可以阻塞读取

缺点

  • 有消息漏读风险

基于Stream的消息队列——消费者组

消费者组:将多个消费者划分到一个组里,监听同一个队列。

特点

  • 消费分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,加快消息的处理速度。
  • 消息标识:消费者组会维护一个标示,记录组内最后一个被处理的消息,哪怕消费者宕机重启,还是能够从标示之后读取消息,确保每一个消息都被消费。
  • 消息确认:消费者获取消息之后,消息处于pending状态,并存入一个pending-list。当处理完后需要XACK来确认消息,标记消息已处理,之后才会从pending-list移除。

创建消费者组

XGROUP CREATE key groupName ID [MKSTREAM]

key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列

其他对应指令:

# 删除指定的消费者组XGROUP DESTROY key group# 给指定的消费者组添加消费者XGROUP CREATECONSUMER key group consumer# 删除消费者组中的指定消费者XGROUP DELCONSUMER key group consumer

从消费者组读取消息

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds][NOACK] STREAMS key [key ...] id [id ...]

group:消费者组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当前消息等待最大时长
NOACK:无需手动ACK,获取消息后自动确认
STREAMS key:指定监听一个或多个队列名称
ID:获取消息的起始ID:

  • “ >” 从下一个未消费的消息开始
  • 其他:根据指定ID从pending-list中获取一个消费但未确认的消息,例如0,是从pending-list中第一个消息开始

确认消息

XACK key group id [id ...]

key:消息队列名称
group:组名
id:确认的消息ID

查看未确认的消息

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

总结

如果业务要求较高,可以考虑使用更加专业的 Kafka、RocketMQ、RabbitMQ。

发表评论

  • 评论列表
还没有人评论,快来抢沙发吧~