Flask与Celery实现Python调度服务
文章目录
- Flask与Celery实现Python调度服务
- 一、前言
- 1.组件
- 2.场景说明
- 3.环境
- 二、安装依赖
- 1.安装Anaconda
- 3.安装redis
- 2.安装依赖包
- 三、具体实现
- 1.目录结构
- 2.业务流程
- 3.配置文件
- 4.Celery程序
- 5.Flask程序
- 6.测试脚本
- 7.程序启动
- 1)Windows开发调试
- 2)Linux服务器部署
- 3)订阅通道
- 4)测试
Flask与Celery实现Python调度服务
一、前言
1.组件
-
Flask:Flask 是一个轻量级的 Python Web 应用框架。Flask 提供了开发 Web 应用所需的基本功能,并且设计灵活,开发者可以根据需要扩展其功能。由于其核心简洁,它特别适合小型应用和微服务架构。同时,Flask 具有很高的可扩展性,通过各种扩展库可以轻松添加数据库集成、表单处理、身份验证等功能。
-
Celery:Celery 是一个异步任务队列/作业队列,基于分布式消息传递的系统。Celery 用于处理异步任务和调度定时任务,非常适合在后台处理耗时的操作,比如发送邮件、生成报告或与外部 API 交互。它通过任务队列和工作进程分离工作负载,可以提高应用程序的性能和响应速度。
-
uWSGI:uWSGI 是一个应用服务器,用于运行 Python Web 应用。uWSGI 旨在高效地服务 Python 应用,并提供了 WSGI 协议的实现。它通常用于在生产环境中部署 Python 应用。uWSGI 支持多种协议和功能,包括负载均衡、进程管理等。它与 Web 服务器(如 Nginx 或 Apache)配合使用,以提高应用的性能和可靠性。
-
Redis:Redis 是一个开源的内存数据结构存储系统,可以用作数据库、缓存和消息中间件。Redis 通过键值对的方式存储数据,支持丰富的数据结构如字符串、哈希、列表、集合和有序集合。它具有极高的读写性能,常用于缓存数据库查询结果、会话存储、队列管理等场景,以减少数据库的压力并提高系统的响应速度。Redis 还支持持久化,可以在重启后恢复数据。
2.场景说明
- 解决在 java 项目中执行 python 脚本的问题;之前实现的方法是通过 Java 程序模拟命令行的交互环境执行 python 脚本,但是无法控制执行脚本的并发,导致服务器硬件资源占用过高的情况时常发生。
3.环境
- Windows 版本(开发):Windows 10 专业版
- Linux 发行版(部署):CentOS-7-x86_64-DVD-1804.iso
- Postman for Windows Version:11.3.2
- flask:2.2.5
- celery:5.4.0
- redis 客户端:5.0.4
- redis 服务端:7.0.12
- uwsgi:2.0.21
Postman 下载:https://www.postman.com/downloads/
flask框架入门和使用实践:https://blog.csdn.net/u011424614/article/details/112548442
Java执行Python脚本:https://blog.csdn.net/u011424614/article/details/114199102
[Windows] Anacoda安装和使用:https://blog.csdn.net/u011424614/article/details/105579502CentOS7安装部署Anaconda:https://blog.csdn.net/u011424614/article/details/140253920
CentOS7安装部署Redis7:https://blog.csdn.net/u011424614/article/details/132418619
二、安装依赖
1.安装Anaconda
- 安装参考:《CentOS7安装部署Anaconda》
- 安装目录:
/opt/anaconda/install
3.安装redis
- 安装参考:《CentOS7安装部署Redis7 》
- 安装目录:
/opt/redis/redis-7.0.12/src
2.安装依赖包
# 创建环境
conda create -n dev01 python==3.10.14
# 激活环境
conda activate dev01# 安装依赖包(指定国内镜像源)
pip install flask==2.2.5 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install celery==5.4.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install redis==5.0.4 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install werkzeug==2.3.8 -i https://pypi.tuna.tsinghua.edu.cn/simple
# Windows 开发环境不需要安装,Linux 部署时使用
conda install uwsgi==2.0.21 -i https://pypi.tuna.tsinghua.edu.cn/simple
三、具体实现
1.目录结构
/opt/scheduler-service
├── logs
├── config
│ ├── redis_config.ini
│ └── uwsgi.ini
├── test
│ ├── my_script.py
├── celery_service.py
├── celery_worker_start.py
├── flask_service.py
└── flask_service_start.py
2.业务流程
执行方式:异步执行
- Postman 发送请求后,调用 flask 程序
- flask 将根据接口参数,通过 celery 异步执行指定脚本,并传递脚本所需参数;celery 异步执行后,获取 celery 任务的 task_id;最后通过 flask 将 task_id 返回给 Postman
- celery 执行脚本成功或失败,都会将执行结果(包含 task_id)发布到 Redis 的指定通道
- Redis 客户端提前订阅通道,监听通道的消息(通过 task_id 匹配执行任务和结果)
3.配置文件
- redis_config.ini
[celery]
# 配置 Celery 的消息代理 URL
broker_url = redis://127.0.0.1:6379/2# 配置 Celery 的结果后端 URL
backend_url = redis://127.0.0.1:6379/3[redis]
# 配置 Redis 的主机地址
host = 127.0.0.1# 配置 Redis 的端口
port = 6379# 配置 Redis 的数据库索引
db = 4# 配置 Redis 的密码,如果没有密码留空
password =[channel]
# 配置消息发布的频道名称
name = schedulerChannel
- uwsgi.ini【Windows 开发环境不需要,Linux部署时使用】
[uwsgi]
# 使用 HTTP 协议,监听 6000 端口
http = :6000# 设置 uWSGI 的工作目录为 Flask 应用所在目录
chdir = ./# 指定 Flask 应用所在的 Python 文件和变量名
module = flask_app:app# 启用主进程
master = true# 启动的工作进程数,推荐设置为 CPU 核心数的 2-4 倍
processes = 4# 线程数,可以根据需要设置
threads = 2# 指定静态文件的目录,如果有静态文件服务的需求
# static-map = /static=/path/to/static/files# 设置 Python 的自动加载,使得每次修改代码后无需重启服务
py-autoreload = 1# 设置日志文件路径
daemonize = /opt/scheduler-service/logs/uwsgi.log
4.Celery程序
- celery_app.py
from celery import Celery
from celery.signals import task_success, task_failure
import importlib.machinery
import redis
from dataclasses import dataclass, asdict
import json
import configparser# 读取配置文件
config = configparser.ConfigParser()
config.read('config/redis_config.ini', encoding='utf-8')# 初始化 Celery 应用
# 设置消息代理(Broker)和结果后端(Backend)
app = Celery('tasks',broker=config['celery']['broker_url'],backend=config['celery']['backend_url']
)# 创建 Redis 连接
client = redis.Redis(host=config['redis']['host'],port=config.getint('redis', 'port'),db=config.getint('redis', 'db'),password=config['redis']['password'])# 频道名称
channel_name = config['channel']['name']# 定义消息数据模型
@dataclass
class Message:success: boolmsg: strtaskId: strresult: str# 任务成功完成时触发的事件
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):print(f"任务 {sender.request.id} 执行成功.")# 构建消息message = Message(success=True, msg='', taskId=sender.request.id, result=result)# 发布消息到频道msg_json = json.dumps(asdict(message), ensure_ascii=False)client.publish(channel_name, msg_json)# 任务失败时触发的事件
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, args=None, kwargs=None, traceback=None, einfo=None, **kw):print(f"任务 {task_id} 执行失败,异常: {exception}")# 构建消息message = Message(success=False, msg=str(exception), taskId=task_id, result=None)# 发布消息到频道msg_json = json.dumps(asdict(message), ensure_ascii=False)client.publish(channel_name, msg_json)# 定义 Celery 任务
@app.task
def execute_script(script_path, *args, **kwargs):try:# 使用 SourceFileLoader 加载模块loader = importlib.machinery.SourceFileLoader("module.name", script_path)module = loader.load_module()# 执行模块中的 main 函数result = module.main(*args, **kwargs)return resultexcept Exception as e:print(f"执行过程异常: {e}", exc_info=True)raise
- celery_worker_start.py
from celery_app import appif __name__ == '__main__':# 在 Windows 环境下运行 Celery Worker# 使用 solo 模式以避免 Windows 下多进程问题app.worker_main(argv=['worker', '--loglevel=info', '--pool=solo'])# 在 Linux 环境下运行 Celery Worker# 使用 8 个并发工作进程# app.worker_main(argv=['worker', '--loglevel=info', '--concurrency=8'])
5.Flask程序
- flask_app.py
from flask import Flask, request
from celery_app import execute_script
from dataclasses import dataclass, asdict
import json# 初始化 Flask 应用实例
app = Flask(__name__)# 定义数据模型
@dataclass
class Message:success: boolmsg: strtaskId: str# 定义执行脚本的接口
@app.route('/execute_script', methods=['POST'])
def run_script():# 获取请求数据data = request.get_json()# 异步执行脚本并获取任务 IDtask = execute_script.delay(data['script_path'], *data['args'])task_id = task.idprint(f"flask_app > task_id: {task_id}")try:# 创建成功消息message = Message(success=True, msg='', taskId=task_id)msg_json = json.dumps(asdict(message), ensure_ascii=False)return msg_json, 200except Exception as e:# 捕获并处理异常,返回错误消息message = Message(success=False, msg=str(e), taskId=task_id)msg_json = json.dumps(asdict(message), ensure_ascii=False)return msg_json, 500
- flask_app_start.py
from flask_app import app# 运行 Flask 应用
if __name__ == '__main__':# 以调试模式启动 Flask 应用# 设置 host 为 '0.0.0.0' 以允许外部访问# 端口号为 6000# 使用 reloader 以便在代码更改时自动重启服务器app.run(debug=True, host='0.0.0.0', port=6000, use_reloader=True)
6.测试脚本
- test_script.py
import time# main 方法作为入口
def main(x, y):# 阻塞 5 秒time.sleep(5)# 计算结果result = x * y# 打印乘法计算结果print(f"乘法计算: {x} X {y} = {result}")# 返回计算结果return result
7.程序启动
1)Windows开发调试
- 运行
flask_app_start.py
和celery_worker_start.py
2)Linux服务器部署
- 未安装 uwsgi
nohup python celery_worker_start.py > logs/celery.$(date +%Y-%m-%d).log 2>&1 &
nohup python flask_app_start.py > logs/flask.$(date +%Y-%m-%d).log 2>&1 &
- 已安装 uwsgi
nohup python celery_worker_start.py > logs/celery.$(date +%Y-%m-%d).log 2>&1 &
uwsgi --ini config/uwsgi.ini
3)订阅通道
- Redis 客户端订阅通道
redis-cli subscribe schedulerChannel
4)测试
- 使用 Springboot 等可以发送请求的程序进行测试,不限编程语言
- 使用 Postman for Windows 发送请求进行测试(当前文章使用的方法)
Postman配置:
- POST 请求:
http://192.168.28.179:6000/execute_script
- Headers 配置(配置 Key 和 Value ):
Content-Type
:application/json
Accept
:application/json
- Body - raw 配置(测试脚本和参数):
{"script_path": "E:\\scheduler-service\\test\\test_script.py","args": [1, 5]
}
相关文章:

Flask与Celery实现Python调度服务
文章目录 Flask与Celery实现Python调度服务一、前言1.组件2.场景说明3.环境 二、安装依赖1.安装Anaconda3.安装redis2.安装依赖包 三、具体实现1.目录结构2.业务流程3.配置文件4.Celery程序5.Flask程序6.测试脚本7.程序启动1)Windows开发调试2)Linux服务…...

Eureka应用场景和优势
Eureka是一款由Netflix开源的服务注册与发现框架,在微服务架构中扮演着至关重要的角色。以下是Eureka的应用场景和优势: Eureka的应用场景 Eureka主要应用于微服务架构中,特别是在大型、复杂的分布式系统中,用于管理和发现服务。…...

prompt第三讲-PromptTemplate
文章目录 前提回顾PromptTemplateprompt 模板定义以f-string渲染格式以mustache渲染格式以jinja2渲染格式直接实例化PromptTemplatePromptTemplate核心变量 prompt value生成invokeformat_prompt(不建议使用)format(不建议使用) batchstreamainvoke PromptTemplate核心方法part…...

卷积神经网络图像识别车辆类型
卷积神经网络图像识别车辆类型 1、图像 自行车: 汽车: 摩托车: 2、数据集目录 3、流程 1、获取数据,把图像转成矩阵,并随机划分训练集、测试集 2、把标签转为数值,将标签向量转换为二值矩阵 3、图像数据归一化,0-1之间的值 4、构造卷积神经网络 5、设置图像输入…...

【接口设计】用 Swagger 实现接口文档
用 Swagger 实现接口文档 1.配置 Swagger1.1 添加 Swagger 依赖1.2 创建 Swagger 配置类 2.编写接口文档 在项目开发中,一般都是由前后端工程师共同定义接口,编写接口文档,之后大家根据这个接口文档进行开发、维护。为了便于编写和维护稳定&a…...

TensorFlow系列:第四讲:MobileNetV2实战
一. 加载数据集 编写工具类,实现数据集的加载 import keras""" 加载数据集工具类 """class DatasetLoader:def __init__(self, path_url, image_size(224, 224), batch_size32, class_modecategorical):self.path_url path_urlself…...

Redis+Caffeine 实现两级缓存实战
RedisCaffeine 实现两级缓存 背景 事情的开始是这样的,前段时间接了个需求,给公司的商城官网提供一个查询预计送达时间的接口。接口很简单,根据请求传的城市仓库发货时间查询快递的预计送达时间。因为商城下单就会调用这个接口ÿ…...

SpringBoot:SpringBoot中如何实现对Http接口进行监控
一、前言 Spring Boot Actuator是Spring Boot提供的一个模块,用于监控和管理Spring Boot应用程序的运行时信息。它提供了一组监控端点(endpoints),用于获取应用程序的健康状态、性能指标、配置信息等,并支持通过 HTTP …...

STM32-I2C硬件外设
本博文建议与我上一篇I2C 通信协议共同理解 合成一套关于I2C软硬件体系 STM32内部集成了硬件I2C收发电路,可以由硬件自动执行时钟生成、起始终止条件生成、应答位收发、数据收发等功能,减轻CPU的负担 特点: 多主机功能&#x…...

暑假第一次作业
第一步:给R1,R2,R3,R4配IP [R1-GigabitEthernet0/0/0]ip address 192.168.1.1 24 [R1-Serial4/0/0]ip address 15.0.0.1 24 [R2-GigabitEthernet0/0/0]ip address 192.168.2.1 24 [R2-Serial4/0/0]ip address 25.0.0.1 24 [R3-GigabitEthernet0/0/0]ip address 192.…...

【算法专题】快速排序
1. 颜色分类 75. 颜色分类 - 力扣(LeetCode) 依据题意,我们需要把只包含0、1、2的数组划分为三个部分,事实上,在我们前面学习过的【算法专题】双指针算法-CSDN博客中,有一道题叫做移动零,题目要…...

debian 12 PXE Server 批量部署系统
pxe server 前言 PXE(Preboot eXecution Environment,预启动执行环境)是一种网络启动协议,允许计算机通过网络启动而不是使用本地硬盘。PXE服务器是实现这一功能的服务器,它提供了启动镜像和引导加载程序,…...

【Pytorch】RNN for Image Classification
文章目录 1 RNN 的定义2 RNN 输入 input, h_03 RNN 输出 output, h_n4 多层5 小试牛刀 学习参考来自 pytorch中nn.RNN()总结RNN for Image Classification(RNN图片分类–MNIST数据集)pytorch使用-nn.RNNBuilding RNNs is Fun with PyTorch and Google Colab 1 RNN 的定义 nn.…...

基于Java的飞机大战游戏的设计与实现论文
点击下载源码 基于Java的飞机大战游戏的设计与实现 摘 要 现如今,随着智能手机的兴起与普及,加上4G(the 4th Generation mobile communication ,第四代移动通信技术)网络的深入,越来越多的IT行业开始向手机…...

初识影刀:EXCEL根据部门筛选低值易耗品
第一次知道这个办公自动化的软件还是在招聘网站上,了解之后发现对于办公中重复性的工作还是挺有帮助的,特别是那些操作非EXCEL的重复性工作,当然用在EXCEL上更加方便,有些操作比写VBA便捷。 下面就是一个了解基本操作后ÿ…...

nginx的四层负载均衡实战
目录 1 环境准备 1.1 mysql 部署 1.2 nginx 部署 1.3 关闭防火墙和selinux 2 nginx配置 2.1 修改nginx主配置文件 2.2 创建stream配置文件 2.3 重启nginx 3 测试四层代理是否轮循成功 3.1 远程链接通过代理服务器访问 3.2 动图演示 4 四层反向代理算法介绍 4.1 轮询࿰…...

中职网络安全B模块Cenots6.8数据库
任务环境说明: ✓ 服务器场景:CentOS6.8(开放链接) ✓ 用户名:root;密码:123456 进入虚拟机操作系统:CentOS 6.8,登陆数据库(用户名:root&#x…...

BGP笔记的基本概要
技术背景: 在只有IGP(诸如OSPF、IS-IS、RIP等协议,因为最初是被设计在一个单域中进行一个路由操纵,因此被统一称为Interior Gateway Protocol,内部网关协议)的时代,域间路由无法实现一个全局路由…...

【Redis】复制(Replica)
文章目录 一、复制是什么?二、 基本命令三、 配置(分为配置文件和命令配置)3.1 配置文件3.2 命令配置3.3 嵌套连接3.4 关闭从属关系 四、 复制原理五、 缺点 以下是本篇文章正文内容 一、复制是什么? 主从复制 masterÿ…...

封装了一个仿照抖音效果的iOS评论弹窗
需求背景 开发一个类似抖音评论弹窗交互效果的弹窗,支持滑动消失, 滑动查看评论 效果如下图 思路 创建一个视图,该视图上面放置一个tableView, 该视图上添加一个滑动手势,同时设置代理,实现代理方法 (BOOL)gestur…...

【JavaWeb程序设计】Servlet(二)
目录 一、改进上一篇博客Servlet(一)的第一题 1. 运行截图 2. 建表 3. 实体类 4. JSP页面 4.1 login.jsp 4.2 loginSuccess.jsp 4.3 loginFail.jsp 5. mybatis-config.xml 6. 工具类:创建SqlSessionFactory实例,进行 My…...

php探针
php探针是用来探测空间、服务器运行状况和PHP信息用的,探针可以实时查看服务器硬盘资源、内存占用、网卡流量、系统负载、服务器时间等信息。 下面就分享下我是怎样利用php探针来探测服务器网站空间速度、性能、安全功能等。 具体步骤如下: 1.从网上下…...

泰勒级数 (Taylor Series) 动画展示 包括源码
泰勒级数 (Taylor Series) 动画展示 包括源码 flyfish 泰勒级数(英语:Taylor series)用无限项连加式 - 级数来表示一个函数,这些相加的项由函数在某一点的导数求得。 定义了一个函数f(x)表示要近似的函数 sin ( x ) \sin(x) …...

蔚来汽车:拥抱TiDB,实现数据库性能与稳定性的飞跃
作者: Billdi表弟 原文来源: https://tidb.net/blog/449c3f5b 演讲嘉宾:吴记 蔚来汽车Tidb爱好者 整理编辑:黄漫绅(表妹)、李仲舒、吴记 本文来自 TiDB 社区合肥站走进蔚来汽车——来自吴记老师的演讲…...

【Django+Vue3 线上教育平台项目实战】构建高效线上教育平台之首页模块
文章目录 前言一、导航功能实现a.效果图:b.后端代码c.前端代码 二、轮播图功能实现a.效果图b.后端代码c.前端代码 三、标签栏功能实现a.效果图b.后端代码c.前端代码 四、侧边栏功能实现1.整体效果图2.侧边栏功能实现a.效果图b.后端代码c.前端代码 3.侧边栏展示分类及…...

对比 UUIDv1 和 UUIDv6
UUIDv6是UUIDv1的字段兼容版本,重新排序以改善数据库局部性。UUIDv6主要在使用UUIDv1的上下文中实现。不涉及遗留UUIDv1的系统应该改用UUIDv7。 与 UUIDv1 将时间戳分割成低、中、高三个部分不同,UUIDv6 改变了这一序列,使时间戳字节从最重要…...

记一次饱经挫折的阿里云ROS部署经历
前言 最近在参加的几个项目测评里,我发现**“一键部署”这功能真心好用,省下了不少宝贵时间和力气,再加上看到阿里云现在有个开源上云**的活动。趁着这波热潮,今天就聊聊怎么从头开始,一步步搞定阿里云的资源编排服务…...

代码运行故障排除:PyCharm中的问题解决指南
代码运行故障排除:PyCharm中的问题解决指南 引言 PyCharm,作为一款流行的集成开发环境(IDE),提供了强大的工具来支持Python开发。然而,即使是最先进的IDE也可能遇到代码无法运行的问题。这些问题可能由多…...

css实现渐进中嵌套渐进的方法
这是我们想要的实现效果: 思路: 1.有一个底色的背景渐变 2.需要几个小的块级元素做绝对定位通过渐变filter模糊来实现 注意:这里的采用的定位方法,所以在内部的元素一律要使用绝对定位,否则会出现层级的问题&…...

JavaWeb后端学习
Web:全球局域网,万维网,能通过浏览器访问的网站 Maven Apache旗下的一个开源项目,是一款用于管理和构建Java项目的工具 作用: 依赖管理:方便快捷的管理项目以来的资源(jar包)&am…...