当前位置: 首页 > news >正文

Redis:基于PubSub(发布/订阅)、Stream流实现消息队列

Redis - PubSub、Stream流

文章目录

  • Redis - PubSub、Stream流
    • 1.基于List的消息队列
    • 2.基于PubSub的消息队列
    • 3.基于Stream的消息队列
      • 1.Redis Streams简介
      • 2.Redis Streams基本命令
        • 1.XADD 添加消息到末尾
        • 2.XLEN 获取消息长度
        • 3.XREAD 读取消息 (单消费模式)
        • 4.XGROUP 消费组操作
        • 5.XREADGROUP GROUP 从消费组读取消息
        • 6.XACK 消息确认
        • 7.XPENDING 查看pend数据

1.基于List的消息队列

  • 由于redis的list数据结构为双向链表,则可以通过lpush和rpop来模拟队列效果
  • 由于队列没有消息时候,需要阻塞获取队列数据,而lpop和rpop在空队列获取数据时会返回null,所以需要使用brpop和blpop来进行阻塞获取
#向data1的list存两个数据
lpush data1  aaa bbb
#右监听data1  等待20秒
brpop data1 20

在这里插入图片描述

缺点:

  • 无法避免消息丢失,
  • 只支持单消费者,无法广播

2.基于PubSub的消息队列

基于发布订阅形式,可以广播,生产者向channel(信道)发送消息,可以由多个消息者去订阅,订阅的消费者都可以收到消息

 SUBSCRIBE channel [channel ...]  #订阅一个或多个信道
 PUBLISH channel message  #向一个信道发送消息
 PSUBSCRIBE pattern [pattern ...]  #通过通配符匹配订阅的信道  匹配规则  ?代表一个字符   []代表中括号内的可选字符  *代表任意字符
SUBSCRIBE  log 
PUBLISH  log zhangsan   

在这里插入图片描述

缺点:

  • 不支持持久化
  • 消息有上限,超出会导致消息丢失

3.基于Stream的消息队列

1.Redis Streams简介

官方文档:https://redis.io/docs/latest/commands/xadd/

Redis Stream是redis在5.x版本引入的新特性,Redis流是一种数据结构,它类似于一个只可追加的日志,但也实现了多种操作,以克服典型只可追加日志的一些限制。这些操作包括O(1)时间的随机访问和复杂的消费策略,如消费者组。你可以使用流来记录并同时实时分发事件。Redis流的使用案例包括:

  • 事件溯源(例如,跟踪用户操作、点击等)
  • 传感器监测(例如,现场设备的读数)
  • 通知(例如,将每个用户的通知记录存储在单独的流中)

Redis为每个流条目生成一个唯一的ID。你可以使用这些ID在后续检索与其关联的条目,或者读取并处理流中的所有后续条目。请注意,由于这些ID与时间相关,这里显示的ID可能会有所不同,与你自己的Redis实例中看到的ID也会有所不同。
Redis流支持多种修剪策略(以防止流无限制地增长)和多种消费策略(参见XREAD、XREADGROUP和XRANGE)。

2.Redis Streams基本命令

stream消息队列相关命令:

  • XADD - 添加消息到末尾
  • XTRIM - 对流进行修剪,限制长度
  • XDEL - 删除消息
  • XLEN - 获取流包含的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组
  • XGROUP CREATECONSUMER 给指定的消费者组添加消费者
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息
1.XADD 添加消息到末尾

1.基本语法

XADD是唯一可以向流中添加数据的 Redis 命令,但 还有其他命令,例如 XDELXTRIM,能够 从流中删除数据。

XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
  • key: 队列名
  • [NOMKSTREAM]:队列不存在是否自动创建,默认自动创建
  • [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] :设置消息队列的最大消息数量
  • <* | id>:消息唯一id,*代表消息由redis自动生成,格式为时间戳-递增序列,可以手动指定
  • field value:字段和值(键值对),可以一次添加多个
XADD users * name zhangsan age 18  #向user发送一条name为zhangsan,age为18的消息,返回消息id

在这里插入图片描述

2.指定stream的id参数

1526919030474-55

ID标识流中的给定消息数据。
如果指定的ID参数是*字符,XADD命令将自动生成一个唯一的ID。然而,尽管仅在极少数情况下有用,但可以指定一个格式良好的ID,以便新条目将与指定的ID完全相同。

XADD mystream 1526919030474-55 message "Hello," 

当自动生成ID时,第一部分是Redis实例生成ID的Unix时间(毫秒)。第二部分只是一个序列号,用于区分在同一毫秒内生成的ID。

XADD mystream 1526919030474-* message " World!"

还可以指定一个不完整的ID,只自动生成序列号部分(注意:6.0版本不支持报错

stream的数据是有序的,所以消息的id始终的递增的,如果手动指定一个小于上一条数据的id则会出错

2.XLEN 获取消息长度
XLEN users  #返回消息个数
3.XREAD 读取消息 (单消费模式)

基础语法

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • [COUNT count] :每次读取的最大数量
  • [BLOCK milliseconds]:当消息没有时,是否阻塞,阻塞时间(毫秒),不阻塞就不给值,如果给0则永久阻塞等待
  • STREAMS key: 从那个队列读取消息,key为读取的队列名
  • id [id ...]:起始id,代表从那个id的消息开始读取;0代表从第一个$代表从最新的消息读取

测试

 xread count 1 streams users 0  #读取users中最开始的一条数据

在这里插入图片描述

消息读取后不会删除,所有消费者都可以重复获取

 xread count 1 streams users $  #读取最新消息 ,返回为nil空xread count 1 block 0 streams users $    #永久阻塞读取

在这里插入图片描述

但是阻塞方式监听到消息后会关闭,需要重新监听

此时在开发中我们可以使用死循环来无限读取最新消息进行监听

但是: 当指定起始id为$时,代表读取最新消息,如果处理消息过程中,又有超过一条以上的消息到达,则下次也只能获取一条最新的消息会导致其他数据漏读

4.XGROUP 消费组操作
  • 消费者组:将多个消费者划分到同一个消费组,监听同一个队列

  • 分流消费:队列中的消费将会分流给消费者组中的消费者,不会重复消费,加快消息消费速度

  • 消息标识:消费者组会维护一个标识,记录最后一个被处理(非最新)的消息,即使redis挂机重启,也可以按照标识恢复读取,确保消息消费

  • 消息确认机制:消费者获取消息后,消息处于pending状态,并存入一个pend-list,当处理完成时通过XACK来确认消息,标记为已处理,才pend-list移除

XGROUP CREATE 创建消费者组

XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
  • key:队列名称
  • group: 消费者组名称
  • <id | $>:起始id标识, 0代表第一个, $代表最新消息
  • [MKSTREAM]:队列不存在时自动创建队列,如果不存在且不指定会报错
  • [ENTRIESREAD entries-read]: redis7.0后的参数,

创建消费者组g1

XGROUP CREATE users g1 0

在这里插入图片描述

XGROUP CREATECONSUMER 给指定的消费者组添加消费者

XGROUP CREATECONSUMER key group consumer

XGROUP DESTROY 删除指定的消费者组

XGROUP DESTROY key group

XGROUP DELCONSUMER 删除消费者组中指定的消费者

XGROUP DELCONSUMER key group consumer
5.XREADGROUP GROUP 从消费组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
  • group: 消费者组名称
  • consumer:消费者名称,如果不存在会自动创建
  • [COUNT count] :每次读取的最大数量
  • [BLOCK milliseconds]:当消息没有时,是否阻塞,阻塞时间(毫秒),不阻塞就不给值,如果给0则永久阻塞等待
  • STREAMS key: 从那个队列读取消息,key为读取的队列名
  • [NOACK] 无需消息确认(类似自动确认)。
  • id [id ...]:起始id

**注意:**id取值:">" :从下一个未消费的消息开始,非最新消息,确保都消费
其他数字:根据指定id从pend-list中获取已消费但未确认消息,例如0,从pend-list第一个消息开始

所以当正常处理时的id都采用">" 进行消费,如果出现异常可以指定0,每次都读取第一个pend-list的消息,即每次都是读取最新的未处理数据,将异常数据处理掉

测试

XREADGROUP GROUP  g1 c1 COUNT 1 BLOCK 2000 STREAMS users >

在这里插入图片描述

6.XACK 消息确认
XACK key group id [id ...]

测试

XACK users g1 1733738565351-0 1733738570018-0 1733738567511-0 1733738587327-0

在这里插入图片描述

7.XPENDING 查看pend数据
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
  • key:队列名称
  • group: 消费者组名称
  • [IDLE min-idle-time]::查看过去空闲时间的以上的消息,比如给5000,则查询空闲时间5000ms以上的消息
  • start end 消息起始范围 “- +”代表所有
  • count: 获取数量
  • [consumer]: 获取那个消费者的

测试

XPENDING users g1 - + 10

在这里插入图片描述

参考来源:https://www.bilibili.com/video/BV1cr4y1671t/?spm_id_from=333.788.videopod.episodes&vd_source=97a7d9497f7eb9e537f6b50df8831e27&p=75

相关文章:

Redis:基于PubSub(发布/订阅)、Stream流实现消息队列

Redis - PubSub、Stream流 文章目录 Redis - PubSub、Stream流1.基于List的消息队列2.基于PubSub的消息队列3.基于Stream的消息队列1.Redis Streams简介2.Redis Streams基本命令1.XADD 添加消息到末尾2.XLEN 获取消息长度3.XREAD 读取消息 &#xff08;单消费模式&#xff09;4…...

C#飞行棋(新手简洁版)

我们要在主函数的顶部写一些全局静态字段 确保能在后续的静态方法中能够获取到这些值和修改 static int[] Maps new int[100];static string[] PlayerName new string[2];static int[] PlayerScore new int[2];static bool[] Flags new bool[2] {true,true }; static int[]…...

【OpenCV】图像转换

理论 傅立叶变换用于分析各种滤波器的频率特性。对于图像&#xff0c;使用 2D离散傅里叶变换&#xff08;DFT&#xff09; 查找频域。快速算法称为 快速傅立叶变换&#xff08;FFT&#xff09; 用于计算DFT。 Numpy中的傅立叶变换 首先&#xff0c;我们将看到如何使用Numpy查…...

力扣 重排链表-143

重排链表-143 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNode *next) : val(x), next(next)…...

【Kubernetes理论篇】容器集群管理系统Kubernetes(K8S)

Kubernetes集群部署基本管理实战 这么好的机会&#xff0c;还在等什么&#xff01; 01、Kubernetes 概述 K8S是什么 K8S 的全称为 Kubernetes (K12345678S)&#xff0c;PS&#xff1a;“嘛&#xff0c;写全称也太累了吧&#xff0c;写”。不如整个缩写 K8s 作为缩写的结果…...

Kubernetes 常用操作大全:全面掌握 K8s 基础与进阶命令

Kubernetes&#xff08;简称 K8s&#xff09;作为一种开源的容器编排工具&#xff0c;已经成为现代分布式系统中的标准。它的强大之处在于能够自动化应用程序的部署、扩展和管理。在使用 Kubernetes 的过程中&#xff0c;熟悉常用操作对于高效地管理集群资源至关重要。本文将详…...

爬虫基础之Web网页基础

网页的组成 网页可以分为三大部分–HTML、CSS 和 JavaScript。如果把网页比作一个人&#xff0c;那么 HTML 相当于骨架、JavaScript 相当于肌肉、CSS 相当于皮肤&#xff0c;这三者结合起来才能形成一个完善的网页。下面我们分别介绍一下这三部分的功能。 HTML HTML(Hypertext…...

k8s, deployment

控制循环&#xff08;control loop&#xff09; for {实际状态 : 获取集群中对象X的实际状态&#xff08;Actual State&#xff09;期望状态 : 获取集群中对象X的期望状态&#xff08;Desired State&#xff09;if 实际状态 期望状态{什么都不做} else {执行编排动作&#xf…...

使用ensp搭建OSPF+BGP和静态路由,底层PC使用dhcp,实现PC互通

1.4种方式&#xff0c;实现PC2可以互通底层的所有设备 OSPF&#xff1a;OSPF是一种用于互联网协议网络的链路状态路由协议 BGP&#xff1a;是一种用于互联网上进行路由和可达性信息传递的外部网关协议&#xff08;EGP&#xff09; 静态路由&#xff1a; 静态路由是一种路由方…...

TÜLU 3: Pushing Frontiers in Open Language Model Post-Training

基本信息 &#x1f4dd; 原文链接: https://arxiv.org/abs/2411.15124&#x1f465; 作者: Nathan Lambert, Jacob Morrison, Valentina Pyatkin, Shengyi Huang, Hamish Ivison, Faeze Brahman, Lester James V. Miranda, Alisa Liu, Nouha Dziri, Shane Lyu, Yuling Gu, Sau…...

深入解读 MySQL EXPLAIN 与索引优化实践

MySQL 是当今最流行的关系型数据库之一&#xff0c;为了提升查询性能&#xff0c;合理使用 EXPLAIN 工具和优化索引显得尤为重要。本文将结合实际示例&#xff0c;探讨如何利用 EXPLAIN 分析查询执行计划&#xff0c;并分享索引优化的最佳实践。 一、EXPLAIN 工具简介 EXPLAIN …...

Flume——进阶(agent特性+三种结构:串联,多路复用,聚合)

目录 agent特性ChannelSelector描述&#xff1a; SinkProcessor描述&#xff1a; 串联架构结构图解定义与描述配置示例Flume1&#xff08;监测端node1&#xff09;Flume3&#xff08;接收端node3&#xff09;启动方式 复制和多路复用结构图解定义描述配置示例node1node2node3启…...

ragflow连ollama时出现的Bug

ragflow和ollama连接后&#xff0c;已经添加了两个模型但是ragflow仍然一直warn&#xff1a;Please add both embedding model and LLM in Settings &#xff1e; Model providers firstly.这里可能是我一开始拉取的镜像容器太小&#xff0c;容不下当前添加的模型&#xff0c;导…...

基于centos7.7编译Redis6.0

背景&#xff1a; OS&#xff1a;CentOs 7.7 Redis: 6.0.6 编译构建报错如下&#xff1a; In file included from server.c:30:0: server.h:1044:5: error: expected specifier-qualifier-list before ‘_Atomic’_Atomic unsigned int lruclock; /* Clock for LRU eviction …...

uni-app项目无法在Android Studio模拟器上运行

目录 1 问题描述2 尝试解决3 引发原因4 解决方法4.1 换用 MuMu 模拟器 5 结语 1 问题描述 在使用 uni-app 开发 Pad 端 App 时&#xff0c;初始化项目后打算先运行一下确保初始化正常。打开 Android Studio 模拟器后&#xff0c;然后在 HbuilderX 中选择使用 App 标准基座 运…...

第一部分:Linux系统(基础及命令)

Linux操作系统的实操性非常强&#xff0c;纯操作&#xff0c;不适用于日常的办公使用 1.初始Linux 1.1 操作系统概述 1.1.1 了解OS的作用 OS&#xff1a;是计算机软件的一种&#xff0c;主要负责&#xff1a;作为用户和计算机硬件之间的桥梁&#xff0c;调度和管理计算机硬…...

No module named ‘_ssl‘ No module named ‘_ctypes‘

如果你使用的是基于 yum 的 Linux 发行版&#xff08;例如 CentOS、RHEL、Fedora&#xff09;&#xff0c;安装 libc6-dev 的方式稍有不同。在这些系统中&#xff0c;通常对应的包是 glibc-devel。 No module named ‘_ctypes’ 使用 yum 安装 glibc-devel 更新系统的软件包列…...

【QT】编写第一个 QT 程序 对象树 Qt 编程事项 内存泄露问题

目录 1. 编写第一个 QT 程序 1.1 使用 标签 实现 &#x1f407; 图形化界面实现 &#x1f407; 纯代码形式实现 1.2 使用 按钮 实现 &#x1f40b; 图形化界面实现 &#x1f40b; 纯代码形式实现 1.3 使用 编辑框 实现 &#x1f95d; 图形化界面实现 &#x1f95…...

VTK编程指南<六>:VTK可视化管线与渲染详解

1、VTK渲染引擎 回顾前几章节的RenderCylinder示例 可以找到以下的类: vtkProp; ytkAbstractMapper; vtkProperty; vtkCamera; vtkLight; vtkRenderer; vtkRenderWindow; vtkRenderWindowInteractor vtkTransform; vtkLookupTable;可以发现这些类都是与数据显示或渲染相关的。…...

基于STM32的智能计步器

引言 随着健康意识的提高&#xff0c;计步器逐渐成为人们日常生活中重要的健康管理工具。本文将指导你如何使用STM32微控制器制作一个智能计步器。该计步器通过加速度传感器检测步伐&#xff0c;并使用OLED显示屏显示步数。通过这个项目&#xff0c;你将学习到STM32开发的基本流…...

[智能体-69]:重新认知MCP:协议不生产智能,只是AI全域交互的标准化基石

MCP只是提供了大模型、编排调度、外部工具能够进行结构化交流的标准&#xff0c;而整个系统的智能主要依赖编排调度&#xff0c;与外部软件系统的交互取决于外部工具&#xff0c;包括外部语音交互、视觉交互、数字化交互。当下MCP&#xff08;Model Context Protocol&#xff0…...

内网环境下Win7系统批量离线补丁部署实战指南

1. 内网Win7补丁部署的挑战与解决方案老旧Win7系统在内网环境中的安全隐患就像漏雨的屋顶&#xff0c;看似不影响日常使用&#xff0c;但随时可能引发严重后果。我经手过几十家单位的系统加固项目&#xff0c;发现这些场景存在三个典型痛点&#xff1a;首先是补丁来源问题&…...

终极鼠标连点器使用指南:3分钟掌握高效自动化技巧

终极鼠标连点器使用指南&#xff1a;3分钟掌握高效自动化技巧 【免费下载链接】MouseClick &#x1f5b1;️ MouseClick &#x1f5b1;️ 是一款功能强大的鼠标连点器和管理工具&#xff0c;采用 QT Widget 开发 &#xff0c;具备跨平台兼容性 。软件界面美观 &#xff0c;操作…...

为什么软件开发偏爱 Linux?深度剖析 Linux 相较于 Windows 的核心优势

引言 在软件开发的世界里&#xff0c;一个有趣的现象是&#xff1a;无论是大型互联网公司的服务器集群&#xff0c;还是资深程序员的个人开发机&#xff0c;Linux 操作系统的身影无处不在。与之形成鲜明对比的是&#xff0c;尽管 Windows 在个人消费市场占据绝对主导地位&…...

光轮智能 谢晨 访谈总结机器人仿真数据产业

光轮智能 谢晨 访谈总结机器人仿真关于创始人关于数据数据金字塔数据痛点仿真数据的重要性仿真数据的质量b站链接地址公司官网关于创始人 清华物理&#xff1b;哥伦比亚金融&#xff1b;英伟达智驾仿真&#xff1b;小鹏智驾仿真&#xff1b;现为光轮智能CEO 关于数据 数据的…...

电信运营商每月处理海量工单,如何不再出错?基于AI Agent的端到端自动化解决方案

在2026年的电信行业&#xff0c;海量工单处理已不再仅仅是效率问题&#xff0c;而是合规与生存的底线。随着2026年5月20日《电信和互联网服务 基础电信企业网上营业厅服务规范》国家标准的正式实施&#xff0c;监管层对“信息透明、流程闭环、计费精准”的要求达到了前所未有的…...

告别元素变动导致的报错:探索自动化测试脚本的 AI“自愈”能力

前言:一个所有测试人都经历过的噩梦 周三晚上十一点,CI/CD流水线再次亮起红灯。 你打开日志,满屏的NoSuchElementException扑面而来。仔细一看——前端团队在昨天的版本中重构了登录页面的DOM结构,原本的#login-btn变成了#signin-button-v2,30个测试用例因此全军覆没。 …...

Unity动态自然系统:Forest Environment-Dynamic Nature深度解析

1. 这不是“贴图堆砌”&#xff0c;而是自然系统级建模&#xff1a;Forest Environment-Dynamic Nature 的真实定位你有没有试过在Unity里拖进几棵树、铺点草、加个天空盒&#xff0c;然后发现场景像一张静止的风景明信片——风不动、叶不摇、雨不落、雾不散&#xff1f;我做过…...

Gazebo Sim多旋翼控制:四轴飞行器动力学建模与PID调参

Gazebo Sim多旋翼控制&#xff1a;四轴飞行器动力学建模与PID调参 【免费下载链接】gz-sim Open source robotics simulator. The latest version of Gazebo. 项目地址: https://gitcode.com/gh_mirrors/gz/gz-sim Gazebo Sim是一款功能强大的开源机器人模拟器&#xff…...

GIS工程应用记录(AI辅助编程)

问题的问题&#xff1a;语境坍缩“从各个角度提出问题&#xff0c;AI做出对应积极答复和修改&#xff0c;结果没有什么变化。”这&#xff0c;就是元问题最核心的症状。你尝试了所有你已知的“高级”协作手段&#xff0c;但就像重拳打在棉花上&#xff0c;AI永远在积极回应&…...