当前位置: 首页 > news >正文

Redis:基于PubSub(发布/订阅)、Stream流实现消息队列

Redis - PubSub、Stream流

文章目录

  • Redis - PubSub、Stream流
    • 1.基于List的消息队列
    • 2.基于PubSub的消息队列
    • 3.基于Stream的消息队列
      • 1.Redis Streams简介
      • 2.Redis Streams基本命令
        • 1.XADD 添加消息到末尾
        • 2.XLEN 获取消息长度
        • 3.XREAD 读取消息 (单消费模式)
        • 4.XGROUP 消费组操作
        • 5.XREADGROUP GROUP 从消费组读取消息
        • 6.XACK 消息确认
        • 7.XPENDING 查看pend数据

1.基于List的消息队列

  • 由于redis的list数据结构为双向链表,则可以通过lpush和rpop来模拟队列效果
  • 由于队列没有消息时候,需要阻塞获取队列数据,而lpop和rpop在空队列获取数据时会返回null,所以需要使用brpop和blpop来进行阻塞获取
#向data1的list存两个数据
lpush data1  aaa bbb
#右监听data1  等待20秒
brpop data1 20

在这里插入图片描述

缺点:

  • 无法避免消息丢失,
  • 只支持单消费者,无法广播

2.基于PubSub的消息队列

基于发布订阅形式,可以广播,生产者向channel(信道)发送消息,可以由多个消息者去订阅,订阅的消费者都可以收到消息

 SUBSCRIBE channel [channel ...]  #订阅一个或多个信道
 PUBLISH channel message  #向一个信道发送消息
 PSUBSCRIBE pattern [pattern ...]  #通过通配符匹配订阅的信道  匹配规则  ?代表一个字符   []代表中括号内的可选字符  *代表任意字符
SUBSCRIBE  log 
PUBLISH  log zhangsan   

在这里插入图片描述

缺点:

  • 不支持持久化
  • 消息有上限,超出会导致消息丢失

3.基于Stream的消息队列

1.Redis Streams简介

官方文档:https://redis.io/docs/latest/commands/xadd/

Redis Stream是redis在5.x版本引入的新特性,Redis流是一种数据结构,它类似于一个只可追加的日志,但也实现了多种操作,以克服典型只可追加日志的一些限制。这些操作包括O(1)时间的随机访问和复杂的消费策略,如消费者组。你可以使用流来记录并同时实时分发事件。Redis流的使用案例包括:

  • 事件溯源(例如,跟踪用户操作、点击等)
  • 传感器监测(例如,现场设备的读数)
  • 通知(例如,将每个用户的通知记录存储在单独的流中)

Redis为每个流条目生成一个唯一的ID。你可以使用这些ID在后续检索与其关联的条目,或者读取并处理流中的所有后续条目。请注意,由于这些ID与时间相关,这里显示的ID可能会有所不同,与你自己的Redis实例中看到的ID也会有所不同。
Redis流支持多种修剪策略(以防止流无限制地增长)和多种消费策略(参见XREAD、XREADGROUP和XRANGE)。

2.Redis Streams基本命令

stream消息队列相关命令:

  • XADD - 添加消息到末尾
  • XTRIM - 对流进行修剪,限制长度
  • XDEL - 删除消息
  • XLEN - 获取流包含的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组
  • XGROUP CREATECONSUMER 给指定的消费者组添加消费者
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息
1.XADD 添加消息到末尾

1.基本语法

XADD是唯一可以向流中添加数据的 Redis 命令,但 还有其他命令,例如 XDELXTRIM,能够 从流中删除数据。

XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
  • key: 队列名
  • [NOMKSTREAM]:队列不存在是否自动创建,默认自动创建
  • [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] :设置消息队列的最大消息数量
  • <* | id>:消息唯一id,*代表消息由redis自动生成,格式为时间戳-递增序列,可以手动指定
  • field value:字段和值(键值对),可以一次添加多个
XADD users * name zhangsan age 18  #向user发送一条name为zhangsan,age为18的消息,返回消息id

在这里插入图片描述

2.指定stream的id参数

1526919030474-55

ID标识流中的给定消息数据。
如果指定的ID参数是*字符,XADD命令将自动生成一个唯一的ID。然而,尽管仅在极少数情况下有用,但可以指定一个格式良好的ID,以便新条目将与指定的ID完全相同。

XADD mystream 1526919030474-55 message "Hello," 

当自动生成ID时,第一部分是Redis实例生成ID的Unix时间(毫秒)。第二部分只是一个序列号,用于区分在同一毫秒内生成的ID。

XADD mystream 1526919030474-* message " World!"

还可以指定一个不完整的ID,只自动生成序列号部分(注意:6.0版本不支持报错

stream的数据是有序的,所以消息的id始终的递增的,如果手动指定一个小于上一条数据的id则会出错

2.XLEN 获取消息长度
XLEN users  #返回消息个数
3.XREAD 读取消息 (单消费模式)

基础语法

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • [COUNT count] :每次读取的最大数量
  • [BLOCK milliseconds]:当消息没有时,是否阻塞,阻塞时间(毫秒),不阻塞就不给值,如果给0则永久阻塞等待
  • STREAMS key: 从那个队列读取消息,key为读取的队列名
  • id [id ...]:起始id,代表从那个id的消息开始读取;0代表从第一个$代表从最新的消息读取

测试

 xread count 1 streams users 0  #读取users中最开始的一条数据

在这里插入图片描述

消息读取后不会删除,所有消费者都可以重复获取

 xread count 1 streams users $  #读取最新消息 ,返回为nil空xread count 1 block 0 streams users $    #永久阻塞读取

在这里插入图片描述

但是阻塞方式监听到消息后会关闭,需要重新监听

此时在开发中我们可以使用死循环来无限读取最新消息进行监听

但是: 当指定起始id为$时,代表读取最新消息,如果处理消息过程中,又有超过一条以上的消息到达,则下次也只能获取一条最新的消息会导致其他数据漏读

4.XGROUP 消费组操作
  • 消费者组:将多个消费者划分到同一个消费组,监听同一个队列

  • 分流消费:队列中的消费将会分流给消费者组中的消费者,不会重复消费,加快消息消费速度

  • 消息标识:消费者组会维护一个标识,记录最后一个被处理(非最新)的消息,即使redis挂机重启,也可以按照标识恢复读取,确保消息消费

  • 消息确认机制:消费者获取消息后,消息处于pending状态,并存入一个pend-list,当处理完成时通过XACK来确认消息,标记为已处理,才pend-list移除

XGROUP CREATE 创建消费者组

XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
  • key:队列名称
  • group: 消费者组名称
  • <id | $>:起始id标识, 0代表第一个, $代表最新消息
  • [MKSTREAM]:队列不存在时自动创建队列,如果不存在且不指定会报错
  • [ENTRIESREAD entries-read]: redis7.0后的参数,

创建消费者组g1

XGROUP CREATE users g1 0

在这里插入图片描述

XGROUP CREATECONSUMER 给指定的消费者组添加消费者

XGROUP CREATECONSUMER key group consumer

XGROUP DESTROY 删除指定的消费者组

XGROUP DESTROY key group

XGROUP DELCONSUMER 删除消费者组中指定的消费者

XGROUP DELCONSUMER key group consumer
5.XREADGROUP GROUP 从消费组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
  • group: 消费者组名称
  • consumer:消费者名称,如果不存在会自动创建
  • [COUNT count] :每次读取的最大数量
  • [BLOCK milliseconds]:当消息没有时,是否阻塞,阻塞时间(毫秒),不阻塞就不给值,如果给0则永久阻塞等待
  • STREAMS key: 从那个队列读取消息,key为读取的队列名
  • [NOACK] 无需消息确认(类似自动确认)。
  • id [id ...]:起始id

**注意:**id取值:">" :从下一个未消费的消息开始,非最新消息,确保都消费
其他数字:根据指定id从pend-list中获取已消费但未确认消息,例如0,从pend-list第一个消息开始

所以当正常处理时的id都采用">" 进行消费,如果出现异常可以指定0,每次都读取第一个pend-list的消息,即每次都是读取最新的未处理数据,将异常数据处理掉

测试

XREADGROUP GROUP  g1 c1 COUNT 1 BLOCK 2000 STREAMS users >

在这里插入图片描述

6.XACK 消息确认
XACK key group id [id ...]

测试

XACK users g1 1733738565351-0 1733738570018-0 1733738567511-0 1733738587327-0

在这里插入图片描述

7.XPENDING 查看pend数据
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
  • key:队列名称
  • group: 消费者组名称
  • [IDLE min-idle-time]::查看过去空闲时间的以上的消息,比如给5000,则查询空闲时间5000ms以上的消息
  • start end 消息起始范围 “- +”代表所有
  • count: 获取数量
  • [consumer]: 获取那个消费者的

测试

XPENDING users g1 - + 10

在这里插入图片描述

参考来源:https://www.bilibili.com/video/BV1cr4y1671t/?spm_id_from=333.788.videopod.episodes&vd_source=97a7d9497f7eb9e537f6b50df8831e27&p=75

相关文章:

Redis:基于PubSub(发布/订阅)、Stream流实现消息队列

Redis - PubSub、Stream流 文章目录 Redis - PubSub、Stream流1.基于List的消息队列2.基于PubSub的消息队列3.基于Stream的消息队列1.Redis Streams简介2.Redis Streams基本命令1.XADD 添加消息到末尾2.XLEN 获取消息长度3.XREAD 读取消息 &#xff08;单消费模式&#xff09;4…...

C#飞行棋(新手简洁版)

我们要在主函数的顶部写一些全局静态字段 确保能在后续的静态方法中能够获取到这些值和修改 static int[] Maps new int[100];static string[] PlayerName new string[2];static int[] PlayerScore new int[2];static bool[] Flags new bool[2] {true,true }; static int[]…...

【OpenCV】图像转换

理论 傅立叶变换用于分析各种滤波器的频率特性。对于图像&#xff0c;使用 2D离散傅里叶变换&#xff08;DFT&#xff09; 查找频域。快速算法称为 快速傅立叶变换&#xff08;FFT&#xff09; 用于计算DFT。 Numpy中的傅立叶变换 首先&#xff0c;我们将看到如何使用Numpy查…...

力扣 重排链表-143

重排链表-143 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNode *next) : val(x), next(next)…...

【Kubernetes理论篇】容器集群管理系统Kubernetes(K8S)

Kubernetes集群部署基本管理实战 这么好的机会&#xff0c;还在等什么&#xff01; 01、Kubernetes 概述 K8S是什么 K8S 的全称为 Kubernetes (K12345678S)&#xff0c;PS&#xff1a;“嘛&#xff0c;写全称也太累了吧&#xff0c;写”。不如整个缩写 K8s 作为缩写的结果…...

Kubernetes 常用操作大全:全面掌握 K8s 基础与进阶命令

Kubernetes&#xff08;简称 K8s&#xff09;作为一种开源的容器编排工具&#xff0c;已经成为现代分布式系统中的标准。它的强大之处在于能够自动化应用程序的部署、扩展和管理。在使用 Kubernetes 的过程中&#xff0c;熟悉常用操作对于高效地管理集群资源至关重要。本文将详…...

爬虫基础之Web网页基础

网页的组成 网页可以分为三大部分–HTML、CSS 和 JavaScript。如果把网页比作一个人&#xff0c;那么 HTML 相当于骨架、JavaScript 相当于肌肉、CSS 相当于皮肤&#xff0c;这三者结合起来才能形成一个完善的网页。下面我们分别介绍一下这三部分的功能。 HTML HTML(Hypertext…...

k8s, deployment

控制循环&#xff08;control loop&#xff09; for {实际状态 : 获取集群中对象X的实际状态&#xff08;Actual State&#xff09;期望状态 : 获取集群中对象X的期望状态&#xff08;Desired State&#xff09;if 实际状态 期望状态{什么都不做} else {执行编排动作&#xf…...

使用ensp搭建OSPF+BGP和静态路由,底层PC使用dhcp,实现PC互通

1.4种方式&#xff0c;实现PC2可以互通底层的所有设备 OSPF&#xff1a;OSPF是一种用于互联网协议网络的链路状态路由协议 BGP&#xff1a;是一种用于互联网上进行路由和可达性信息传递的外部网关协议&#xff08;EGP&#xff09; 静态路由&#xff1a; 静态路由是一种路由方…...

TÜLU 3: Pushing Frontiers in Open Language Model Post-Training

基本信息 &#x1f4dd; 原文链接: https://arxiv.org/abs/2411.15124&#x1f465; 作者: Nathan Lambert, Jacob Morrison, Valentina Pyatkin, Shengyi Huang, Hamish Ivison, Faeze Brahman, Lester James V. Miranda, Alisa Liu, Nouha Dziri, Shane Lyu, Yuling Gu, Sau…...

深入解读 MySQL EXPLAIN 与索引优化实践

MySQL 是当今最流行的关系型数据库之一&#xff0c;为了提升查询性能&#xff0c;合理使用 EXPLAIN 工具和优化索引显得尤为重要。本文将结合实际示例&#xff0c;探讨如何利用 EXPLAIN 分析查询执行计划&#xff0c;并分享索引优化的最佳实践。 一、EXPLAIN 工具简介 EXPLAIN …...

Flume——进阶(agent特性+三种结构:串联,多路复用,聚合)

目录 agent特性ChannelSelector描述&#xff1a; SinkProcessor描述&#xff1a; 串联架构结构图解定义与描述配置示例Flume1&#xff08;监测端node1&#xff09;Flume3&#xff08;接收端node3&#xff09;启动方式 复制和多路复用结构图解定义描述配置示例node1node2node3启…...

ragflow连ollama时出现的Bug

ragflow和ollama连接后&#xff0c;已经添加了两个模型但是ragflow仍然一直warn&#xff1a;Please add both embedding model and LLM in Settings &#xff1e; Model providers firstly.这里可能是我一开始拉取的镜像容器太小&#xff0c;容不下当前添加的模型&#xff0c;导…...

基于centos7.7编译Redis6.0

背景&#xff1a; OS&#xff1a;CentOs 7.7 Redis: 6.0.6 编译构建报错如下&#xff1a; In file included from server.c:30:0: server.h:1044:5: error: expected specifier-qualifier-list before ‘_Atomic’_Atomic unsigned int lruclock; /* Clock for LRU eviction …...

uni-app项目无法在Android Studio模拟器上运行

目录 1 问题描述2 尝试解决3 引发原因4 解决方法4.1 换用 MuMu 模拟器 5 结语 1 问题描述 在使用 uni-app 开发 Pad 端 App 时&#xff0c;初始化项目后打算先运行一下确保初始化正常。打开 Android Studio 模拟器后&#xff0c;然后在 HbuilderX 中选择使用 App 标准基座 运…...

第一部分:Linux系统(基础及命令)

Linux操作系统的实操性非常强&#xff0c;纯操作&#xff0c;不适用于日常的办公使用 1.初始Linux 1.1 操作系统概述 1.1.1 了解OS的作用 OS&#xff1a;是计算机软件的一种&#xff0c;主要负责&#xff1a;作为用户和计算机硬件之间的桥梁&#xff0c;调度和管理计算机硬…...

No module named ‘_ssl‘ No module named ‘_ctypes‘

如果你使用的是基于 yum 的 Linux 发行版&#xff08;例如 CentOS、RHEL、Fedora&#xff09;&#xff0c;安装 libc6-dev 的方式稍有不同。在这些系统中&#xff0c;通常对应的包是 glibc-devel。 No module named ‘_ctypes’ 使用 yum 安装 glibc-devel 更新系统的软件包列…...

【QT】编写第一个 QT 程序 对象树 Qt 编程事项 内存泄露问题

目录 1. 编写第一个 QT 程序 1.1 使用 标签 实现 &#x1f407; 图形化界面实现 &#x1f407; 纯代码形式实现 1.2 使用 按钮 实现 &#x1f40b; 图形化界面实现 &#x1f40b; 纯代码形式实现 1.3 使用 编辑框 实现 &#x1f95d; 图形化界面实现 &#x1f95…...

VTK编程指南<六>:VTK可视化管线与渲染详解

1、VTK渲染引擎 回顾前几章节的RenderCylinder示例 可以找到以下的类: vtkProp; ytkAbstractMapper; vtkProperty; vtkCamera; vtkLight; vtkRenderer; vtkRenderWindow; vtkRenderWindowInteractor vtkTransform; vtkLookupTable;可以发现这些类都是与数据显示或渲染相关的。…...

基于STM32的智能计步器

引言 随着健康意识的提高&#xff0c;计步器逐渐成为人们日常生活中重要的健康管理工具。本文将指导你如何使用STM32微控制器制作一个智能计步器。该计步器通过加速度传感器检测步伐&#xff0c;并使用OLED显示屏显示步数。通过这个项目&#xff0c;你将学习到STM32开发的基本流…...

VB.NET 从入门到精通:开启编程进阶之路

摘要&#xff1a; 本文全面深入地阐述了 VB.NET 的学习路径&#xff0c;从基础的环境搭建与语法入门开始&#xff0c;逐步深入到面向对象编程、图形用户界面设计、数据访问、异常处理、多线程编程以及与其他技术的集成等核心领域&#xff0c;通过详细的代码示例与理论讲解&…...

射频电路屏蔽简略

电磁波的干扰是每个射频设备的自带属性&#xff0c;不管是内部还是外部&#xff0c;怎样去更好的抑制掉干扰&#xff0c;关系到射频设备的工作状态&#xff0c;而能够找到产生干扰的来源就是重中之重&#xff0c;电磁波的干扰与其产生的源密不可分&#xff0c;而源就离不开所需…...

基础算法——搜索与图论

搜索与图论 图的存储方式2、最短路问题2.1、Dijkstra算法&#xff08;朴素版&#xff09;2.2、Dijkstra算法&#xff08;堆优化版&#xff09;2.3、Bellman-Ford算法2.4、SPFA求最短路2.5、SPFA判负环2.6、Floyd算法 图的存储方式 2、最短路问题 最短路问题可以分为单源最短路…...

redis优化编码之字符串

redis 优化编码之字符串 ### 字符串优化 字符串对象是redis内部最常用的数据类型。 所有的键是字符串对象值对象除了整数之外都是使用字符串存储lpush cache:type "redis" "tair" "memcache" "leveldb"创建如上一个链表 需要创建一…...

Python特定版本的安装/卸载/环境配置,Spyder安装教程

目录 1.Python安装 1.1 Python下载 1.2 下载特定版本 1.3 安装Python 1.4 修改安装 1.5 环境配置 1.6 卸载Python 2.Spyder安装使用 2.1 Spyder下载 2.1.1 官网下载Spyder 2.2.2 Github下载Spyder 2.2 安装 参考资料&#xff1a;网盘 1.Python安装 1.1 Python下载…...

全局搜索正则表达式(grep)

一.grep简介 grep 全程Globally search a Regular Expression and Print&#xff0c;是一种强大的文本搜索工具&#xff0c;它能使用特定模式匹配&#xff08;包括正则表达式&#xff09;搜索文本&#xff0c;并默认输出匹配行。Unix的grep家族包括grep和egrep 二.grep的工作…...

linux-12 关于shell(十一)ls

登录系统输入用户名和密码以后&#xff0c;会显示给我们一个命令提示符&#xff0c;就意味着我们在这里就可以输入命令了&#xff0c;给一个命令&#xff0c;这个命令必须要可执行&#xff0c;那问题是我的命令怎么去使用&#xff0c;命令格式有印象吗&#xff1f;在命令提示符…...

编写指针函数使向右循环移动m个位置

题目描述:有n个整数&#xff0c;要求你编写一个函数使其向右循环移动m个位置 请仔细阅读右侧代码&#xff0c;结合相关知识&#xff0c;在Begin-End区域内进行代码补充。 输入 输入n m表示有n个整数&#xff0c;移动m位 输出 输出移动后的数组 样例输入&#xff1a; 10 5 1 2 3…...

xvisor调试记录

Xvisor是一种开源hypervisor,旨在提供完整、轻量、移植且灵活的虚拟化解决方案,属于type-1类型的虚拟机,可以直接在裸机上启动。 启动xvisor步骤: 1、搭建riscv编译环境 首先从github上下载riscv-gnu-toolchain很费劲,建议直接从国内的源下载 git clone https://gitee…...

MongoDB-ObjectID 生成器

前言 MongoDB中一个非常关键的概念就是 ObjectID&#xff0c;它是 MongoDB 中每个文档的默认唯一标识符。了解 ObjectID 的生成机制不仅有助于开发人员优化数据库性能&#xff0c;还能帮助更好地理解 MongoDB 的设计理念。 什么是 MongoDB ObjectID&#xff1f; 在 MongoDB …...

wordpress自动添加动态内容/关键词搜索热度查询

Fiddler中的inspector的意思是“检查器”的意思&#xff0c;用来显示界面左侧抓取到的数据列表中选定数据的请求和响应。 1 打开inspector 可以使用两种方法来打开Fiddler的inspector&#xff0c;一种是在数据列表中双击某个数据就可以打开右侧面板&#xff0c;在右侧面板中选…...

网站制作要多长时间/百度知道网页版

Redis 三主三从集群搭建方式一&#xff1a;源码安装下载源码包&#xff1a;wget http://download.redis.io/releases/redis-3.2.10.tar.gz解压并进入目录&#xff1a;tar xf redis-3.2.10 && cd redis-3.2.10makeredis命令做软连接ln -s /root/redis-3.2.10/src/* /usr…...

手机移动端网站怎么做/推广普通话宣传周

欢迎观看 Microsoft Excel 教程&#xff0c;小编带大家学习 Microsoft Excel 的使用技巧&#xff0c;了解如何在 Excel 图表中添加或删除次坐标轴。 如果图表中的格式从数据系列到数据系列跨度很大&#xff0c;可以在次坐标轴上绘制一个或多个数据系列。 如果同一个图表中有混…...

网站制作公司茂名/无限制访问国外的浏览器

最近打开了交接过来的旧代码&#xff0c;编译了一下&#xff0c;出现以下错误&#xff1a; Error[Li005]: no definition for "__disable_interrupt" Error[Li005]: no definition for "__enable_interrupt"解决方法&#xff1a;添加头文件#include <…...

免费做网站tk/百度指数批量获取

第三章 常用命令1、mkdir : 创建目录&#xff0c;make directorys&#xff0c;-p 递归创建目录mkdir-p /a/b/c2、ls : -l(long)d(directory)显示目录或文件&#xff0c;全称list-l#列出文件的详细信息&#xff0c;如创建者&#xff0c;创建时间&#xff0c;文件的读写权限列表…...

网站建设客户说没用/广告网

1.“我可以向你问路吗?” “到那里?” “到你心里。” 2.“我可以向你借一块钱吗?” “为什么?” “我想打电话告诉我妈&#xff0c;我刚遇到我的梦中情人。”或“我要打电话给你妈妈谢谢她。” 3.“你爸爸是小偷吗?” “不是。” “那他怎么能把灿烂的星星偷来放在你双眸…...