当前位置: 首页 > news >正文

laravel实现AMQP(rabbitmq)生产者以及消费者

基于php-amqplib/php-amqplib组件适配laravel框架的amqp封装库

支持便捷可配置的队列工作模式 官网详情

在此基础上可支持延迟消息、死信队列等机制。

环境要求:

PHP版本: ^7.3|^8.0

需要开启的扩展: socket

其他:

  1. 如果需要实现延迟任务需要安装对应版本的rabbitmq延迟插件,以rabbitmq3.9.0版本为例:
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
cp rabbitmq_delayed_message_exchange-3.9.0.ez /opt/rabbitmq/plugins/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

用法:

第一步 安装组件:

composer require sai97/laravel-amqp

第二步 发布服务以及配置:

php artisan vendor:publish --provider="Sai97\LaravelAmqp\AmqpQueueProviders"

执行完后会分别在app/config目录下生成amqp.php(amqp连接配置等)、app/QueueJob/DefaultQueueJob.php(默认队列任务)

amqp.php

<?phpuse App\QueueJob\DefaultQueueJob;return ["connection" => ["default" => ["host" => env("AMQP_HOST", "127.0.0.1"),"port" => env("AMQP_PORT", 5672),"user" => env("AMQP_USER", "root"),"password" => env("AMQP_PASSWORD", "root")]],"event" => ["default" => DefaultQueueJob::class,]
];

connection为amqp连接配置,可根据自身业务去调整,完全对应php-amqplib/php-amqplib相关配置项, event是队列实例标识,最好和connection用相同的key以便管理。

目前可支持相关接口项:

 //获取连接名称public function getConnectName(): string;//获取交换机名称public function getExchangeName(): string;//获取交换机类型public function getExchangeType(): string;//获取队列名称public function getQueueName(): string;//获取路由KEYpublic function getRoutingKey(): string;//获取ContentTypepublic function getContentType(): string;//是否开启死信模式public function isDeadLetter(): bool;//获取死信交换机名称public function getDeadLetterExchangeName(): string;//获取死信路由KEYpublic function getDeadLetterRoutingKey(): string;//获取死信队列名称public function getDeadLetterQueueName(): string;//是否开启延迟任务public function isDelay(): bool;//获取延迟任务过期时长public function getDelayTTL(): int;//获取队列附加参数public function getQueueArgs(): array;//获取回调函数public function getCallback(): callable;//是否自动提交ACKpublic function isAutoAck(): bool;

当然你也可以自定义队列实例,只要继承Sai97\LaravelAmqp\Queue基类即可,具体功能配置参数参考Sai97\LaravelAmqp\QueueInterface。

代码示例:

生产者:

$message = "This is message...";
$amqpQueueServices = new AmqpQueueServices(QueueFactory::getInstance(DefaultQueueJob::class));
$amqpQueueServices->producer($message);

消费者:

利用laravel自带的Command去定义一个RabbitMQWorker自定义命令行,仅需要定义一次,后续只需要更改amqp.php配置文件添加不同的队列实例绑定关系即可,以下是RabbitMQWorker演示代码:

<?phpnamespace App\Console\Commands;use Illuminate\Console\Command;
use Sai97\LaravelAmqp\AmqpQueueServices;
use Sai97\LaravelAmqp\QueueFactory;class RabbitMQWorker extends Command
{/*** The name and signature of the console command.** @var string*/protected $signature = 'rabbitmq:worker {event}';/*** The console command description.** @var string*/protected $description = 'rabbitmq worker 消费进程';/*** Create a new command instance.** @return void*/public function __construct(){parent::__construct();}/*** Execute the console command.** @return mixed*/public function handle(){try {$event = $this->argument("event");$eventConfig = config("amqp.event");if (!isset($eventConfig[$event]) || empty($entity = $eventConfig[$event])) {return $this->error("未知的事件: {$event}");}$this->info("rabbitmq worker of event[{$event}] process start ...");$amqpQueueServices = new AmqpQueueServices(QueueFactory::getInstance($entity));$amqpQueueServices->consumer();} catch (\Throwable $throwable) {$event = $event ?? "";$this->error($throwable->getFile() . " [{$throwable->getLine()}]");return $this->error("rabbitmq worker of event[{$event}] process error:{$throwable->getMessage()}");}$this->info("rabbitmq worker of event[{$event}] process stop ...");}
}

完成RabbitMQWorker消费者命令后,我们只需执行php artisan rabbitmq:worker default 完成监听,其中default是可变的,请根据的amqp.php配置中的队列实例绑定标识去输入。

因为队列的消费者都需要是守护进程,所以我们可以依托supervisord进程管理器去定义RabbitMQWorker消费者命令,这样可以保证进程可后台允许以及重启启动等,以下是supervisord.conf配置文件示例:

[program:rabbitmq-worker-default]
#process_name=%(program_name)s_%(process_num)d
process_name=worker_%(process_num)d
numprocs=3
command=/usr/local/bin/php /app/www/laravel8/artisan rabbitmq:worker default
autostart=true
autorestart=true
startretries=3
priority=3
stdout_logfile=/var/log/rabbitmq-worker-default.log
redirect_stderr=true

搭配supervisord来进行管理消费者进程有许多便捷的方面:

  1. 如果需要新增一个队列实例,只需要按照上述格式复制一个program,可以在不影响其他进程的情况下进程更新supervisord配置:
supervisorctl update

    2. 通过配置numprocs参数来设定需要开启多少个相同配置项的消费者worker,这在任务分发、并行处理等场景十分适用,大大提高消费者执行效率。

这里不详细叙述supervisord相关操作,具体可查看supervisord官方文档。

参考链接:https://github.com/Z-Sai/laravel-amqp

相关文章:

laravel实现AMQP(rabbitmq)生产者以及消费者

基于php-amqplib/php-amqplib组件适配laravel框架的amqp封装库 支持便捷可配置的队列工作模式 官网详情 在此基础上可支持延迟消息、死信队列等机制。 环境要求&#xff1a; PHP版本: ^7.3|^8.0 需要开启的扩展: socket 其他: 如果需要实现延迟任务需要安装对应版本的ra…...

LeetCode——二叉树篇(九)

刷题顺序及思路来源于代码随想录&#xff0c;网站地址&#xff1a;https://programmercarl.com 目录 669. 修剪二叉搜索树 108. 将有序数组转换为二叉搜索树 538. 把二叉搜索树转换为累加树 669. 修剪二叉搜索树 给你二叉搜索树的根节点 root &#xff0c;同时给定最小边界…...

uniapp scroll-view横向滚动无效,scroll-view子元素flex布局不生效

要素排查&#xff1a; 1.scroll-x属性需要开启&#xff0c;官方类型是Boolean&#xff0c;实际字符串也行。 2scroll-view标签需要给予一个固定宽度&#xff0c;可以是百分百也可以是固定宽度或者100vw。 3.子元素需要设置display: inline-block&#xff08;行内块元素&#x…...

无涯教程-进程 - 简介

进程间通信就是在不同进程之间传播或交换信息&#xff0c;那么不同进程之间存在着什么双方都可以访问的介质呢?进程的用户空间是互相独立的&#xff0c;一般而言是不能互相访问的&#xff0c;唯一的例外是共享内存区。另外&#xff0c;系统空间是“公共场所”&#xff0c;各进…...

HTML番外篇(四)-HTML5新增元素-CSS常见函数-理解浏览器前缀-BFC

一、HTML5新增元素 1.HTML5语义化元素 在HMTL5之前&#xff0c;我们的网站分布层级通常包括哪些部分呢&#xff1f; header、nav、main、footer ◼ 但是这样做有一个弊端&#xff1a; 我们往往过多的使用div, 通过id或class来区分元素&#xff1b;对于浏览器来说这些元素不…...

机器学习之Adam(Adaptive Moment Estimation)自适应学习率

Adam&#xff08;Adaptive Moment Estimation&#xff09;是一种常用的优化算法&#xff0c;特别适用于训练神经网络和深度学习模型。它是一种自适应学习率的优化算法&#xff0c;可以根据不同参数的梯度信息来动态调整学习率&#xff0c;以提高训练的效率和稳定性。 Adam算法…...

深入理解Linux权限管理:保护系统安全的重要措施

Linux操作系统以其稳定性、可靠性和灵活性而受到广泛使用。其中一个关键特性是其强大的权限管理系统&#xff0c;它可以保护系统资源和用户数据的安全性。本文将深入探讨Linux权限管理的概念、原则和实践&#xff0c;帮助您理解如何正确配置和管理权限&#xff0c;以确保系统的…...

kafka复习:(20):消费者拦截器的使用

一、定义消费者拦截器&#xff08;只消费含"sister"的消息&#xff09; package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.…...

水库大坝安全监测的主要内容包括哪些?

在水库大坝的实时监测中&#xff0c;主要任务是通过无线传感网络监测各个监测点的水位、水压、渗流、流量、扬压力等数据&#xff0c;并在计算机上用数据模式或图形模式进行实时反映&#xff0c;以掌握整个水库大坝的各项变化情况。大坝安全监测系统能实现全天候远程自动监测&a…...

Cadence软件屏幕显示问题

问题 就是今天打开Cadence软件想导出网表看一下&#xff0c;发现没有显示确定按钮什么的&#xff0c;那个窗口也是无语&#xff0c;不能移动&#xff0c;缩放也只能左右缩放&#xff0c;还不能缩小什么的&#xff0c;真的醉了&#xff0c;后面就是调整窗口的分辨率。 因为我最…...

访问服务器快慢的因素

我们在租用服务器的过程中&#xff0c;可能在访问速度方面&#xff0c;会受到某些因素影响&#xff0c;如果您要进行此项业务&#xff0c;进行一些简单的了解 是非常的有必要的&#xff0c;下面壹基比小鑫带大家一起去做个具体的探讨吧。 对于服务器不太了解的都认为&#xff0…...

vue(element ui安装)

目录 一&#xff0c;element ui安装二&#xff0c;main.js三&#xff0c;使用element ui最后 一&#xff0c;element ui安装 先在盘服中找到你创建的node的位置 如有不懂根据可以看看上一章安装node 然后在终端找到 进入这个位置之后就可以安装了 输入npm i element-ui -S这个…...

基于FPGA视频接口之HDMI2.0编/解码

简介 为什么要特别说明HDMI的版本,是因为HDMI的版本众多,代表的HDMI速度同样不同,当前版本在HDMI2.1速度达到48Gbps,可以传输4K及以上图像,但我们当前还停留在1080P@60部分,且使用的芯片和硬件结构有很大差别,故将HDMI分为两个部分说明1080@60以下分辨率和4K以上分辨率(…...

Codeforces Round #894 (Div.3)

文章目录 前言A. Gift Carpet题目&#xff1a;输入&#xff1a;输出&#xff1a;思路&#xff1a;代码&#xff1a; B. Sequence Game题目&#xff1a;输入&#xff1a;输出&#xff1a;思路&#xff1a;代码&#xff1a; C. Flower City Fence题目&#xff1a;输入&#xff1a…...

MyBatid动态语句且模糊查询

目录 什么是MyBtais动态语句&#xff1f;&#xff1f;&#xff1f; MyBatis常用的动态标签和表达式 if标签 Choose标签 where标签 MyBatis模糊查询 #与$的区别 ​编辑 MyBatis映射 resultType resultMap 什么是MyBtais动态语句&#xff1f;&#xff1f;&#xff1f;…...

JVM——垃圾回收器G1+垃圾回收调优

4.4 G1&#xff08;一个垃圾回收器&#xff09; 定义: 取代了CMS垃圾回收器。和CMS一样时并发的。 适用场景: 物理上分区&#xff0c;逻辑上分代。 相关JVM参数: -XX:UseG1GC-XX:G1HeapRegionSizesize-XX:MaxGCPauseMillistime 1) G1 垃圾回收阶段 三个回收阶段&#xff0…...

【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析

【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析 系列文章汇总见:《【SA8295P 源码分析】00 - 系列文章链接汇总》 本文链接:《【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析》 主要参数如下: hw_…...

iptables的使用规则

环境中为了安全要限制swagger的访问&#xff0c;最简单的方式是通过iptables防火墙设置规则限制。 在测试服务器中设置访问swagger-ui.html显示如下&#xff0c;区分大小写&#xff1a; iptables设置限制访问9783端口的swagger字段的请求&#xff1a; iptables -A INPUT -p t…...

JS 动画 vs CSS 动画:究竟有何不同?

在 Web 前端开发中&#xff0c;动画是提高用户体验的关键因素之一。我们通常可以使用 JavaScript&#xff08;JS&#xff09;和 CSS 来创建动画效果。但是&#xff0c;这两者之间有哪些区别呢&#xff1f;在本文中&#xff0c;我们将深入研究 JS 动画和 CSS 动画&#xff0c;探…...

供应链 | 大数据报童模型:基于机器学习的实践见解

论文解读&#xff1a;李欣 马玺渊 作者&#xff1a;Gah-Yi Ban, Cynthia Rudin 引用&#xff1a;Ban, Gah-Yi and Cynthia Rudin. The big data newsvendor: Practical insights from machine learning. Operations Research 67.1 (2019): 90-108. 文章链接&#xff1a;https…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

Python爬虫(一):爬虫伪装

一、网站防爬机制概述 在当今互联网环境中&#xff0c;具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类&#xff1a; 身份验证机制&#xff1a;直接将未经授权的爬虫阻挡在外反爬技术体系&#xff1a;通过各种技术手段增加爬虫获取数据的难度…...

C++.OpenGL (10/64)基础光照(Basic Lighting)

基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...

自然语言处理——循环神经网络

自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元&#xff08;GRU&#xff09;长短期记忆神经网络&#xff08;LSTM&#xff09…...

《C++ 模板》

目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板&#xff0c;就像一个模具&#xff0c;里面可以将不同类型的材料做成一个形状&#xff0c;其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式&#xff1a;templa…...

AGain DB和倍数增益的关系

我在设置一款索尼CMOS芯片时&#xff0c;Again增益0db变化为6DB&#xff0c;画面的变化只有2倍DN的增益&#xff0c;比如10变为20。 这与dB和线性增益的关系以及传感器处理流程有关。以下是具体原因分析&#xff1a; 1. dB与线性增益的换算关系 6dB对应的理论线性增益应为&…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化

缓存架构 代码结构 代码详情 功能点&#xff1a; 多级缓存&#xff0c;先查本地缓存&#xff0c;再查Redis&#xff0c;最后才查数据库热点数据重建逻辑使用分布式锁&#xff0c;二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...