当前位置: 首页 > news >正文

Celery(分布式任务队列)入门学习笔记

Celery 的简单介绍

用 Celery 官方的介绍:它是一个分布式任务队列; 简单,灵活,可靠的处理大量消息的分布式系统; 它专注于实时处理,并支持任务调度。

Celery 如果使用 RabbitMQ 作为消息系统的话,整个应用体系就是下面这张图

Celery 官方给出的 Hello World, 对于未接触它的人来说根本就不知道是什么

1

2

3

4

5

6

7

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task

def hello():

    return 'hello world'

还是有必要按住上面那张图看 Celery 的组成部分

  1. Celery 自身实现的部分其实是 Producer 和 Consumer. Producer 创建任务,并发送消息到消息队列,我们称这个队列为 Broker。Consumer 从 Broker 中接收消息,完成计算任务,把结果存到 Backend
  2. Broker 就是那个消息队列,可选择的实现有 RabbitMQ, Redis, Amazon SQS
  3. 结果存储(Backend), 可选择 AMQP(像 RabbitMQ 就是它的一个实现), Redis, Memcached, Cassandra, Elasticsearch, MongoDB, CouchDB, DynamoDB, Amazon S3, File system 等等,看来它的定制性很强
  4. 消息和结果的存储还涉及到一个序列化的问题,可选择 pickle(Python 专用), json, yaml, msgpack. 消息可用 zlib, bzip2 进行压缩, 或加密存储
  5. Worker 的并发可采用 prefork(多进程), thread(多线程), Eventlet, gevent, solo(单线程)]

Celery 应用的基础选型

Celery 的 Broker 和 Backend 有非常多的选择组合,RabbitMQ 和 Redis 都是即可作为 Broker 又能用作 Backend。但 Celery 的推荐是用 RabbitMQ 作为 Broker, 小的结果这里选择用 Redis 作为 Backend, 所以这里的选型是

  1. Broker: RabbitMQ
  2. Backend: Redis
  3. 序列化:JSON  -- 方便在学习中查到消息中的数据
准备 Redis

安装 Python 包

在需要运行 Producer 和 Consumer(worker) 的机器上创建一个 Python 虚拟环境,然后安装下面的包

$ pip install celery redis

实践中只需要安装 celery redis 就能运行后面的例子,没有安装 librabbitmq, "celery[librabbitmq]" 也行,安装了这两个库能使用更高效的 librabbitmq C 库。如果安装了 librabbitmq 库,broker='amqp://...'  默认使用 librabbitmq, 找不到 librabbitmq 的话就用 broker='pyamqp://...'

$ pip install librabbitmq
$ pip install "celery[librabbitmq]"

注:中括号中的是安装 Celery 提供的 bundle, 它定义在 setup.py 的 setup 函数中的 extras_require。

Celery 应用实战

我们不用 Celery 的 Hello World 实例,那不能帮助我们理解背后发生了什么。创建一个 tasks.py 文件

1

2

3

4

5

6

7

8

9

10

11

from celery import Celery

app = Celery('celery-demo',

                broker='amqp://celery:your-password@192.168.86.181:5672/',

                backend='redis://192.168.86.181:6379')

@app.task

def add(x, y):

    return x + y

这里配置连接到 brocker 的 / vhost, 如果连接到别的 vhost, 如 celery 的话, url 写成 amqp://celery:your-passoword@192.168.86.181:5672/celery. backend 的 redis 如果要配置密码, 和 db 的话,写成 redis://:password@192.168.86.181:6379/2

暂且不在该脚本中直接执行 add.delay(15, 30), 而是放到 Python 控制台下方便测试

现在进到 Python 控制台

1

2

3

4

5

6

>>> from tasks import add

>>> task = add.delay(15, 30)

>>> task.id

'c3552fa2-502a-450b-933b-19a1da65ba33'

>>> task.status

'PENDING'

由于 Worker 还没有启动,所以得到一个 task_id, 状态是 PENDING。趁这时候看看 Celery 目前做了什么,来查看到 RabbitMQ

7

celery direct

Celery 在 RabbitMQ 中创建了的资源有

  1. 一个 Exchange: celery direct
  2. 两个 binding: 送到默认(空字符串)或 celery exchange 的, routing-key 为 celery 的消息会转发到队列 celery 中
  3. 一个队列 celery

查看队列 celery 中的消息

1

2

3

4

5

6

vagrant@celery:~$ rabbitmqadmin get queue=celery ackmode=ack_requeue_true

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

| routing_key | exchange | message_count |                                       payload                                       | payload_bytes | payload_encoding | redelivered |

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

| celery      |          | 0             | [[15, 30], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}] | 83            | string           | False       |

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

ackmode=ack_requeue_true, 所以消息仍然在队列中, Redis 中什么也还没发生,接下来要

启动 Celery Worker

要用到 celery 命令,不过只要是 Python 的程序,命令行能做的事情总是能用 Python 代码来执行,用 celery --help 可看它的详细说明。

$ celery -A tasks worker -l INFO

tasks 是自己创建的模块文件 tasks.py

这时候显示出一条绿绿的芹菜出来了,所以得用屏幕截图来表现

取出消息并显示任务执行完成,这时候去看 RabbitMQ 的队列 celery 中的消息不见了,启动 Worker 后也会在 RabbitMQ 中创建 queue, 及对应的 binding, exchange。

再回到提交任务的 Python 控制台

1

2

3

4

>>> task.status

'SUCCESS'

>>> task.result

45

一个 Celery 全套服务圆满完成。结果存在了 Redis 中

192.168.86.181:6379> keys *
1) "celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33"
192.168.86.181:6379> TTL celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
(integer) 85840
192.168.86.181:6379> get celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
"{\"status\": \"SUCCESS\", \"result\": 45, \"traceback\": null, \"children\": [], \"date_done\": \"2022-01-17T07:23:48.901999\", \"task_id\": \"c3552fa2-502a-450b-933b-19a1da65ba33\"}"

Redis 中的结果保存时长为 24 小时,失败的任务会记录下异常信息。

关于 Worker 的控制查看帮助 celery worker --help, 比如

  1. -c, --concurrency: 并发数,默认为系统中 CPU 的内核数
  2. -P, --pool [prefork|eventlet|gevent|solo|processes|threads]:  worker 池的实现方式
  3. --max-tasks-per-child INTEGER: worker 执行的最大任务数,达到最大数目后便重启当前 worker
  4. -Q, --queues: 指定处理任务的队列名称,逗号分隔

任务的状态变迁是:PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

Celery 的配置

除了在声明 Celery 对象时可以指定 broker, backend 属性之外,我们可以用 py 配置文件的形式来配置更多的内容,配置文件 celeryconfig.py, 内容是 Configuration and defautls 中列出的项目

比如 celeryconfig.py

1

2

3

4

5

6

7

broker_url = 'amqp://celery:your-password@192.168.86.181:5672/'

result_backend = 'redis://192.168.86.181:6379'

task_serializer = 'json'

result_serializer = 'json'

accept_content = ['json']

timezone = 'America/Chicago'

enable_utc = True

新的格式是用小写的,旧格式用大写,如 BROKER_URL, 但是同一个配置文件中不能混合大小写,同时写 BROKER_URL 和 result_backend 就不行了。

然后在 tasks.py 中加载配置文件

1

2

3

4

from celery import Celery

import celeryconfig

app = Celery('celery-demo')

app.config_from_object(celeryconfig)

Celery 实时监控工具

Flower 是一个基于 Web 的监控 Celery 中任务的工具,安装和启动

$ pip install flower
$ celery -A tasks flower

打开链接 http://localhost:5555

其他剩下的问题,应该就是如何安排 Worker(比如结合 AutoScaling),从 Python 代码中启动 Worker, 怎么做灵活的配置, 调度任务的执行,其他的 backend 选择等等。

其他补充

backend rpc:// 的组合

如果配置中用

1

2

broker_url = 'amqp://celery:password@192.168.86.50:5672/celery'

result_backend = 'rpc://'

amqp 和 rpc:// 的组合,任务和结果都会存在 RabbitMQ 中

1

2

broker_url = 'redis://192.168.86.50'

result_backend = 'rpc://'

redis 和  rpc:// 的组合,任务和结果都保存在 Redis 中

为什么 Celery 推荐使用 RabbitMQ, 一说是它的一开发人员负责开发过 RabbitMQ, 所以即使使用 Redis 时,也会在 Redis 中写入有关 RabbitMQ 概念的数据,如 exchange, routing key 等。

 常见问题

Celery ValueError: not enough values to unpack (expected 3, got 0)的解决方案

先安装eventlet

pip install eventlet

然后,启动worker的时候加一个参数,如下:

celery -A <moduleName> worker -l info -P eventlet

然后就可以正常运行worker执行任务了

相关文章:

Celery(分布式任务队列)入门学习笔记

Celery 的简单介绍 用 Celery 官方的介绍&#xff1a;它是一个分布式任务队列; 简单&#xff0c;灵活&#xff0c;可靠的处理大量消息的分布式系统; 它专注于实时处理&#xff0c;并支持任务调度。 Celery 如果使用 RabbitMQ 作为消息系统的话&#xff0c;整个应用体系就是下…...

【网络】tcp协议如何保证可靠性

TCP&#xff08;Transmission Control Protocol&#xff09;是一种面向连接的、可靠的传输层协议&#xff0c;为网络通信提供了可靠性和连接稳定性。本文将详细介绍 TCP 协议如何保证数据的可靠传输和连接的稳定性&#xff0c;并分析其优缺点。 可靠性保证 序号和确认机制&…...

select,poll,epoll

在 Linux Socket 服务器短编程时&#xff0c;为了处理大量客户的连接请求&#xff0c;需要使用非阻塞I/O和复用&#xff0c;select&#xff0c;poll 和 epoll 是 Linux API 提供的I/O复用方式。 \selectpollepoll操作方式遍历遍历回调底层实现数组链表哈希表IO效率每次调用都进…...

【48天笔试强训】day18

题目1 描述 有一种兔子&#xff0c;从出生后第3个月起每个月都生一只兔子&#xff0c;小兔子长到第三个月后每个月又生一只兔子。 例子&#xff1a;假设一只兔子第3个月出生&#xff0c;那么它第5个月开始会每个月生一只兔子。 一月的时候有一只兔子&#xff0c;假如兔子都…...

链表经典面试题01

目录 引言 面试题01:返回倒数第k个节点 题目描述: 思路分析: 代码展示: 面试题02:链表的回文结构 题目描述: 描述 思路分析: 代码展示: 面试题03:相交链表 题目描述: 思路分析: 代码展示: 小结: 引言 这次的题均来自力扣和牛客有关链表的经典面试题,代码只会展示…...

基于java的CRM客户关系管理系统的设计与实现(论文 + 源码 )

【免费】基于Java的CRM客户关系管理系统的设计和实现.zip资源-CSDN文库https://download.csdn.net/download/JW_559/89273409 基于Java的CRM客户关系管理系统的设计与实现 摘 要 随着互联网的高速发展&#xff0c;市场经济的信息化&#xff0c;让企业之间的竞争变得&#xff0…...

【动态规划-最长上升子序列模型part2】:拦截导弹、导弹防御系统、最长公共上升子序列【已更新完成】

1、拦截导弹 某国为了防御敌国的导弹袭击&#xff0c;发展出一种导弹拦截系统。 但是这种导弹拦截系统有一个缺陷&#xff1a;虽然它的第一发炮弹能够到达任意的高度&#xff0c;但是以后每一发炮弹都不能高于前一发的高度。 某天&#xff0c;雷达捕捉到敌国的导弹来袭。 由于…...

Spring 如何解决 Bean 循环依赖

循环依赖解释 bean A 属性注入时依赖bean B &#xff0c;并且bean B属性注入时也依赖bean A &#xff0c;造成 bean A 和bean B 都无法完成初始化问题&#xff0c;形成了闭环。 注意 项目中存在Bean的循环依赖&#xff0c;是Bean对象职责划分不明确、代码质量不高的表现&#…...

【driver4】锁,错误码,休眠唤醒,中断,虚拟内存,tasklet

文章目录 1.互斥锁和自旋锁选择&#xff1a;自旋锁&#xff08;开销少&#xff09;的自旋时间和被锁住的代码执行时间成正比关系2.linux错误码&#xff1a;64位系统内核空间最后一页地址为0xfffffffffffff000~0xffffffffffffffff&#xff0c;这段地址是被保留的&#xff0c;如果…...

python之 函数相关知识解析

01 函数的注释与嵌套 1.函数的注释 函数的注释与普通注释的区别&#xff1a;用来说明当前函数的参数含义 param 参数名: 参数的注释信息 return: 函数的返回值 例如&#xff1a; def fun1(name):""":param name: 参数的注释信息:return: 函数的返回值"…...

监视器和显示器的区别,普通硬盘和监控硬盘的区别

监视器与显示器的区别&#xff0c;你真的知道吗&#xff1f; 中小型视频监控系统中&#xff0c;显示系统是最能展现效果的一个重要环节&#xff0c;显示系统的优劣将直接影响视频监控系统的用户体验满意度。 中小型视频监控系统中&#xff0c;显示系统是最能展现效果的一个重要…...

Linux:升级OpenSSL和OpenSSH

原因是现有版本存在安全漏洞&#xff0c;需要升级到新版本 原有版本和升级后的版本 OpenSSL 1.0.2k-fips 26 Jan 2017 -> OpenSSL 1.1.1w 11 Sep 2023OpenSSH_7.4p1, OpenSSL 1.0.2k-fips 26 Jan 2017 -> OpenSSH_9.5p1, OpenSSL 1.1.1w 11 Sep 2023目录 查看现有版…...

方法的入栈和出栈

一.作用域问题 1.全局作用域 在全局都能进行访问的变量 var a 10;function fn() {var b 20;return a b;}console.log(fn()); 2.局部的作用域 只能在限定的范围内进行访问 function fn() {var b 20;}console.log(b); b is not defined 打印的结果是b这个变量没用定义 3…...

PHP介绍

&#x1f40c;博主主页&#xff1a;&#x1f40c;​倔强的大蜗牛&#x1f40c;​ &#x1f4da;专栏分类&#xff1a;PHP❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、PHP是什么&#xff1f; 二、 PHP 文件是什么&#xff1f; 三、PHP能做什么&#xff1f; 四、P…...

接口自动化测试之-requests模块详解

一、requests背景 Requests 继承了urllib2的所有特性。Requests支持HTTP连接保持和连接池&#xff0c;支持使用cookie保持会话&#xff0c;支持文件上传&#xff0c;支持自动确定响应内容的编码&#xff0c;支持国际化的 URL 和 POST 数据自动编码。 二、requests安装 利用p…...

低代码+定制物资管理:创新解决方案探析

引言 在当今快速变化的商业环境中&#xff0c;企业面临着不断增长的挑战&#xff0c;如提高效率、降低成本、满足客户需求等。为了应对这些挑战&#xff0c;企业需要不断创新并采用先进的技术解决方案。在这样的背景下&#xff0c;低代码开发和定制化物资管理成为了引领企业变…...

13 【PS作图】人物绘画理论-脸型

三庭五眼 三庭&#xff1a;脸的长度比例 &#xff08;1&#xff09;发际线到眉毛 &#xff08;2&#xff09;眉毛到鼻底 &#xff08;3&#xff09;鼻底到下巴 三个部分大致为三等分 五眼&#xff1a;脸的宽度比例 以眼睛长度为单位&#xff0c;把脸的宽度分成五等分&#x…...

Python批量修改图片文件名中的指定名称

批量处理图像时&#xff0c;图片名有时需要统一&#xff0c;本教程仅针对图片中名如&#xff1a;0001x4.png&#xff0c;批量将图片名中的x4去除&#xff0c;只留下0001.png的情况。 如果想要按照原图片顺序批量修改图片名&#xff0c;参考其它博文&#xff1a;按照原顺序批量…...

STM32点灯大师(点了一颗LED灯,轮询法)

配置操作&#xff1a; 一、使用CubeMX配置到大致的操作 1.1 选择芯片 1.2 选择引脚&#xff08;根据电路图&#xff09; 1.3 配置gpio口 1.4 配置系统 1.5文件项目操作 最后就是点击 二、点击CubeMX生成的代码&#xff0c;并且修改代码 2.1 看看效果 2.2 写代码...

动态规划算法:路径问题

例题一 解法&#xff08;动态规划&#xff09;&#xff1a; 算法思路&#xff1a; 1. 状态表⽰&#xff1a; 对于这种「路径类」的问题&#xff0c;我们的状态表⽰⼀般有两种形式&#xff1a; i. 从 [i, j] 位置出发&#xff0c;巴拉巴拉&#xff1b; ii. 从起始位置出…...

盘一盘接口测试的那些痛点,你现在会解决了吗

前言 说到接口测试&#xff0c;想必大家一定不会陌生。接口测试就是测试系统组件间&#xff0c;接口对接是否顺畅的一种测试。包括测试数据能否交换、能否传递、能否正常控制管理过程&#xff0c;以及系统间的相互逻辑依赖关系&#xff0c;等等。 由于接口测试主要是检测系统…...

基于alpha shapes的边缘点提取(matlab)

1、原理介绍 由Edelsbrunner H提出的alpha shapes算法是一种简单、有效的快速提取边界点算法。其克服了点云边界点形状影响的缺点&#xff0c;可快速准确提取边界点。如下图所示&#xff0c;对于任意形状的平面点云&#xff0c;若一个半径为a的圆&#xff0c;绕其进行滚动&…...

C#三人飞行棋

C#三人飞行棋 #region 1控制台设置int w 50, h 30; ConsoleInit(w, h); #endregion#region 2 场景选择实例//声明一个表示场景标识的变量 E_SceneType nowSceneType new E_SceneType(); while (true) {switch (nowSceneType){case E_SceneType.Begion://开始场景逻辑Consol…...

Notes for the missing semester. Useful and basic knowledge about Linux.

The Shell Contents The first course is to introduce some simple commands. I’ll list some commands that I’m not familiar with: # --silent means dont give log info, # --head means we only want the http head. curl --head --silent bing.com.cn# cut --deli…...

【信息系统项目管理师知识点速记】资源管理基础

项目团队 执行项目工作,实现项目目标的一组人员。成员具备不同技能,可全职或兼职,随项目进展而变化。参与项目规划和决策,贡献专业技能,增强对项目的责任感。项目管理团队 直接参与项目管理活动的成员,负责项目管理和领导。负责项目各阶段的启动、规划、执行、监督、控制…...

Android性能优化面试题汇总

Android的性能优化涉及多个方面,如启动优化、稳定性优化、内存优化、网络优化、电量优化、安全优化等方面。 一、稳定性优化 1.1 你们做了哪些稳定性方面的优化 随着项目的逐渐成熟,用户基数逐渐增多,DAU持续升高,我们遇到了很多稳定性方面的问题,对于我们技术同学遇到…...

Ansible 自动化运维工具 - 了解和模块应用

目录 一. Ansible 的相关知识 1.1 Ansible 工具的简介 1.2 Ansible的四大组件 1.3 运维自动化工具 1.4 Ansible 和其它自动化运维工具对比 1.5 Ansible 的优缺点 二. Ansible 环境安装部署 2.1 管理端安装 ansible 2.2 配置主机清单 三. ansible 命令行模块 3.1 comm…...

AI神助攻!小白也能制作自动重命名工具~

我们平时从网上下载一些文件&#xff0c;文件名很多都是一大串字母和数字&#xff0c;不打开看看&#xff0c;根本不知道里面是什么内容。 我想能不能做个工具&#xff0c;把我们一个文件夹下面的所有word、excel、ppt、pdf文件重命名为文件内容的第一行。 我们有些朋友可能不会…...

(读书笔记-大模型) LLM Powered Autonomous Agents

目录 智能体系统的概念 规划组件 记忆组件 工具组件 案例研究 智能体系统的概念 在大语言模型&#xff08;LLM&#xff09;赋能的自主智能体系统中&#xff0c;LLM 充当了智能体的大脑&#xff0c;其三个关键组件分别如下&#xff1a; 首先是规划&#xff0c;它又分为以下…...

超分辨率重建——BSRN网络训练自己数据集并推理测试(详细图文教程)

目录 一、BSRN网络总结二、源码包准备三、环境准备3.1 报错KeyError: "No object named BSRN found in arch registry!"3.2 安装basicsr源码包3.3 参考环境 四、数据集准备五、训练5.1 配置文件参数修改5.2 启动训练5.2.1 命令方式训练5.2.2 配置Configuration方式训…...

做推广比较好的网站/十大最靠谱it培训机构

基于gentooapachephpmysql的源码包安装&#xff01; 由于gentoo的难度太大&#xff0c;初次学习建议使用红帽linux! lamp编译所需要的软件&#xff01; 1. libxml2-2.6.30.tar.gz 2. libmcrypt-2.5.8.tar.gz 3. zlib-1.2.3.tar.gz 4. libpng-1.2.31.tar.gz 5. jpegsrc.v6b.tar.…...

网站开发颜色代码/新浪舆情通

目录 1 说明 2 支持exFAT 3 支持中文文件名 3.1 修改内核 3.2 使用mount系统调用 1 说明 本文内容来源于调试hi3531A的过程中总结的经验。 2 支持exFAT linux发行版解决支持exFAT的问题很简单&#xff0c;通过包管理器安装相应的软件包即可&#xff0c;比如Ubuntu&…...

中国建设银行网站客户注册码/查排名

C 递归 递归指的是在函数的定义中使用函数自身的方法。 举个例子&#xff1a; 从前有座山&#xff0c;山里有座庙&#xff0c;庙里有个老和尚&#xff0c;正在给小和尚讲故事呢&#xff01;故事是什么呢&#xff1f;"从前有座山&#xff0c;山里有座庙&#xff0c;庙里有…...

中国安能建设总公司网站/营销网站的宣传、推广与运作

题目大意&#xff1a;定义无向图生成树的最大边与最小边的差为苗条度&#xff0c;找出苗条度最小的生成树的苗条度。 题目分析&#xff1a;先将所有边按权值从小到大排序&#xff0c;在连续区间[L,R]中的边如果能构成一棵生成树&#xff0c;那么这棵树一定有最小的苗条度。枚举…...

中天建设集团有限公司第九建设公司/德州seo优化

对列的顺序进行处理 order [date, time, open, high] df df[order]会按照新的列规则来进行排列...

wordpress 平铺相册/一手渠道推广平台

该文章是一个系列文章&#xff0c;是本人在Android开发的漫漫长途上的一点感想和记录&#xff0c;我会尽量按照先易后难的顺序进行编写该系列。该系列引用了《Android开发艺术探索》以及《深入理解Android 卷Ⅰ&#xff0c;Ⅱ&#xff0c;Ⅲ》中的相关知识&#xff0c;另外也借…...