当前位置: 首页 > news >正文

Flask与Celery实现Python调度服务

文章目录

  • Flask与Celery实现Python调度服务
  • 一、前言
    • 1.组件
    • 2.场景说明
    • 3.环境
  • 二、安装依赖
    • 1.安装Anaconda
    • 3.安装redis
    • 2.安装依赖包
  • 三、具体实现
    • 1.目录结构
    • 2.业务流程
    • 3.配置文件
    • 4.Celery程序
    • 5.Flask程序
    • 6.测试脚本
    • 7.程序启动
      • 1)Windows开发调试
      • 2)Linux服务器部署
      • 3)订阅通道
      • 4)测试

Flask与Celery实现Python调度服务

一、前言

1.组件

  • Flask:Flask 是一个轻量级的 Python Web 应用框架。Flask 提供了开发 Web 应用所需的基本功能,并且设计灵活,开发者可以根据需要扩展其功能。由于其核心简洁,它特别适合小型应用和微服务架构。同时,Flask 具有很高的可扩展性,通过各种扩展库可以轻松添加数据库集成、表单处理、身份验证等功能。

  • Celery:Celery 是一个异步任务队列/作业队列,基于分布式消息传递的系统。Celery 用于处理异步任务和调度定时任务,非常适合在后台处理耗时的操作,比如发送邮件、生成报告或与外部 API 交互。它通过任务队列和工作进程分离工作负载,可以提高应用程序的性能和响应速度。

  • uWSGI:uWSGI 是一个应用服务器,用于运行 Python Web 应用。uWSGI 旨在高效地服务 Python 应用,并提供了 WSGI 协议的实现。它通常用于在生产环境中部署 Python 应用。uWSGI 支持多种协议和功能,包括负载均衡、进程管理等。它与 Web 服务器(如 Nginx 或 Apache)配合使用,以提高应用的性能和可靠性。

  • Redis:Redis 是一个开源的内存数据结构存储系统,可以用作数据库、缓存和消息中间件。Redis 通过键值对的方式存储数据,支持丰富的数据结构如字符串、哈希、列表、集合和有序集合。它具有极高的读写性能,常用于缓存数据库查询结果、会话存储、队列管理等场景,以减少数据库的压力并提高系统的响应速度。Redis 还支持持久化,可以在重启后恢复数据。

2.场景说明

  • 解决在 java 项目中执行 python 脚本的问题;之前实现的方法是通过 Java 程序模拟命令行的交互环境执行 python 脚本,但是无法控制执行脚本的并发,导致服务器硬件资源占用过高的情况时常发生。

3.环境

  • Windows 版本(开发):Windows 10 专业版
  • Linux 发行版(部署):CentOS-7-x86_64-DVD-1804.iso
  • Postman for Windows Version:11.3.2
  • flask:2.2.5
  • celery:5.4.0
  • redis 客户端:5.0.4
  • redis 服务端:7.0.12
  • uwsgi:2.0.21

Postman 下载:https://www.postman.com/downloads/

flask框架入门和使用实践:https://blog.csdn.net/u011424614/article/details/112548442

Java执行Python脚本:https://blog.csdn.net/u011424614/article/details/114199102
[Windows] Anacoda安装和使用:https://blog.csdn.net/u011424614/article/details/105579502

CentOS7安装部署Anaconda:https://blog.csdn.net/u011424614/article/details/140253920

CentOS7安装部署Redis7:https://blog.csdn.net/u011424614/article/details/132418619

二、安装依赖

1.安装Anaconda

  • 安装参考:《CentOS7安装部署Anaconda》
  • 安装目录:/opt/anaconda/install

3.安装redis

  • 安装参考:《CentOS7安装部署Redis7 》
  • 安装目录:/opt/redis/redis-7.0.12/src

2.安装依赖包

# 创建环境
conda create -n dev01 python==3.10.14
# 激活环境
conda activate dev01# 安装依赖包(指定国内镜像源)
pip install flask==2.2.5 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install celery==5.4.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install redis==5.0.4 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install werkzeug==2.3.8 -i https://pypi.tuna.tsinghua.edu.cn/simple
# Windows 开发环境不需要安装,Linux 部署时使用
conda install uwsgi==2.0.21 -i https://pypi.tuna.tsinghua.edu.cn/simple

三、具体实现

1.目录结构

/opt/scheduler-service
├── logs
├── config
│   ├── redis_config.ini
│   └── uwsgi.ini
├── test
│   ├── my_script.py
├── celery_service.py
├── celery_worker_start.py
├── flask_service.py
└── flask_service_start.py

2.业务流程

执行方式:异步执行

  1. Postman 发送请求后,调用 flask 程序
  2. flask 将根据接口参数,通过 celery 异步执行指定脚本,并传递脚本所需参数;celery 异步执行后,获取 celery 任务的 task_id;最后通过 flask 将 task_id 返回给 Postman
  3. celery 执行脚本成功或失败,都会将执行结果(包含 task_id)发布到 Redis 的指定通道
  4. Redis 客户端提前订阅通道,监听通道的消息(通过 task_id 匹配执行任务和结果)

3.配置文件

  • redis_config.ini
[celery]
# 配置 Celery 的消息代理 URL
broker_url = redis://127.0.0.1:6379/2# 配置 Celery 的结果后端 URL
backend_url = redis://127.0.0.1:6379/3[redis]
# 配置 Redis 的主机地址
host = 127.0.0.1# 配置 Redis 的端口
port = 6379# 配置 Redis 的数据库索引
db = 4# 配置 Redis 的密码,如果没有密码留空
password =[channel]
# 配置消息发布的频道名称
name = schedulerChannel
  • uwsgi.ini【Windows 开发环境不需要,Linux部署时使用】
[uwsgi]
# 使用 HTTP 协议,监听 6000 端口
http = :6000# 设置 uWSGI 的工作目录为 Flask 应用所在目录
chdir = ./# 指定 Flask 应用所在的 Python 文件和变量名
module = flask_app:app# 启用主进程
master = true# 启动的工作进程数,推荐设置为 CPU 核心数的 2-4 倍
processes = 4# 线程数,可以根据需要设置
threads = 2# 指定静态文件的目录,如果有静态文件服务的需求
# static-map = /static=/path/to/static/files# 设置 Python 的自动加载,使得每次修改代码后无需重启服务
py-autoreload = 1# 设置日志文件路径
daemonize = /opt/scheduler-service/logs/uwsgi.log

4.Celery程序

  • celery_app.py
from celery import Celery
from celery.signals import task_success, task_failure
import importlib.machinery
import redis
from dataclasses import dataclass, asdict
import json
import configparser# 读取配置文件
config = configparser.ConfigParser()
config.read('config/redis_config.ini', encoding='utf-8')# 初始化 Celery 应用
# 设置消息代理(Broker)和结果后端(Backend)
app = Celery('tasks',broker=config['celery']['broker_url'],backend=config['celery']['backend_url']
)# 创建 Redis 连接
client = redis.Redis(host=config['redis']['host'],port=config.getint('redis', 'port'),db=config.getint('redis', 'db'),password=config['redis']['password'])# 频道名称
channel_name = config['channel']['name']# 定义消息数据模型
@dataclass
class Message:success: boolmsg: strtaskId: strresult: str# 任务成功完成时触发的事件
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):print(f"任务 {sender.request.id} 执行成功.")# 构建消息message = Message(success=True, msg='', taskId=sender.request.id, result=result)# 发布消息到频道msg_json = json.dumps(asdict(message), ensure_ascii=False)client.publish(channel_name, msg_json)# 任务失败时触发的事件
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, args=None, kwargs=None, traceback=None, einfo=None, **kw):print(f"任务 {task_id} 执行失败,异常: {exception}")# 构建消息message = Message(success=False, msg=str(exception), taskId=task_id, result=None)# 发布消息到频道msg_json = json.dumps(asdict(message), ensure_ascii=False)client.publish(channel_name, msg_json)# 定义 Celery 任务
@app.task
def execute_script(script_path, *args, **kwargs):try:# 使用 SourceFileLoader 加载模块loader = importlib.machinery.SourceFileLoader("module.name", script_path)module = loader.load_module()# 执行模块中的 main 函数result = module.main(*args, **kwargs)return resultexcept Exception as e:print(f"执行过程异常: {e}", exc_info=True)raise
  • celery_worker_start.py
from celery_app import appif __name__ == '__main__':# 在 Windows 环境下运行 Celery Worker# 使用 solo 模式以避免 Windows 下多进程问题app.worker_main(argv=['worker', '--loglevel=info', '--pool=solo'])# 在 Linux 环境下运行 Celery Worker# 使用 8 个并发工作进程# app.worker_main(argv=['worker', '--loglevel=info', '--concurrency=8'])

5.Flask程序

  • flask_app.py
from flask import Flask, request
from celery_app import execute_script
from dataclasses import dataclass, asdict
import json# 初始化 Flask 应用实例
app = Flask(__name__)# 定义数据模型
@dataclass
class Message:success: boolmsg: strtaskId: str# 定义执行脚本的接口
@app.route('/execute_script', methods=['POST'])
def run_script():# 获取请求数据data = request.get_json()# 异步执行脚本并获取任务 IDtask = execute_script.delay(data['script_path'], *data['args'])task_id = task.idprint(f"flask_app > task_id: {task_id}")try:# 创建成功消息message = Message(success=True, msg='', taskId=task_id)msg_json = json.dumps(asdict(message), ensure_ascii=False)return msg_json, 200except Exception as e:# 捕获并处理异常,返回错误消息message = Message(success=False, msg=str(e), taskId=task_id)msg_json = json.dumps(asdict(message), ensure_ascii=False)return msg_json, 500
  • flask_app_start.py
from flask_app import app# 运行 Flask 应用
if __name__ == '__main__':# 以调试模式启动 Flask 应用# 设置 host 为 '0.0.0.0' 以允许外部访问# 端口号为 6000# 使用 reloader 以便在代码更改时自动重启服务器app.run(debug=True, host='0.0.0.0', port=6000, use_reloader=True)

6.测试脚本

  • test_script.py
import time# main 方法作为入口
def main(x, y):# 阻塞 5 秒time.sleep(5)# 计算结果result = x * y# 打印乘法计算结果print(f"乘法计算: {x} X {y} = {result}")# 返回计算结果return result

7.程序启动

1)Windows开发调试

  • 运行 flask_app_start.pycelery_worker_start.py

2)Linux服务器部署

  • 未安装 uwsgi
nohup python celery_worker_start.py > logs/celery.$(date +%Y-%m-%d).log 2>&1 &
nohup python flask_app_start.py > logs/flask.$(date +%Y-%m-%d).log 2>&1 &
  • 已安装 uwsgi
nohup python celery_worker_start.py > logs/celery.$(date +%Y-%m-%d).log 2>&1 &
uwsgi --ini config/uwsgi.ini

3)订阅通道

  • Redis 客户端订阅通道
redis-cli subscribe schedulerChannel

4)测试

  1. 使用 Springboot 等可以发送请求的程序进行测试,不限编程语言
  2. 使用 Postman for Windows 发送请求进行测试(当前文章使用的方法

Postman配置:

  1. POST 请求:http://192.168.28.179:6000/execute_script
  2. Headers 配置(配置 Key 和 Value ):
  • Content-Type : application/json
  • Accept : application/json
  1. Body - raw 配置(测试脚本和参数):
{"script_path": "E:\\scheduler-service\\test\\test_script.py","args": [1, 5]
}

相关文章:

Flask与Celery实现Python调度服务

文章目录 Flask与Celery实现Python调度服务一、前言1.组件2.场景说明3.环境 二、安装依赖1.安装Anaconda3.安装redis2.安装依赖包 三、具体实现1.目录结构2.业务流程3.配置文件4.Celery程序5.Flask程序6.测试脚本7.程序启动1)Windows开发调试2)Linux服务…...

Eureka应用场景和优势

Eureka是一款由Netflix开源的服务注册与发现框架,在微服务架构中扮演着至关重要的角色。以下是Eureka的应用场景和优势: Eureka的应用场景 Eureka主要应用于微服务架构中,特别是在大型、复杂的分布式系统中,用于管理和发现服务。…...

prompt第三讲-PromptTemplate

文章目录 前提回顾PromptTemplateprompt 模板定义以f-string渲染格式以mustache渲染格式以jinja2渲染格式直接实例化PromptTemplatePromptTemplate核心变量 prompt value生成invokeformat_prompt(不建议使用)format(不建议使用) batchstreamainvoke PromptTemplate核心方法part…...

卷积神经网络图像识别车辆类型

卷积神经网络图像识别车辆类型 1、图像 自行车: 汽车: 摩托车: 2、数据集目录 3、流程 1、获取数据,把图像转成矩阵,并随机划分训练集、测试集 2、把标签转为数值,将标签向量转换为二值矩阵 3、图像数据归一化,0-1之间的值 4、构造卷积神经网络 5、设置图像输入…...

【接口设计】用 Swagger 实现接口文档

用 Swagger 实现接口文档 1.配置 Swagger1.1 添加 Swagger 依赖1.2 创建 Swagger 配置类 2.编写接口文档 在项目开发中,一般都是由前后端工程师共同定义接口,编写接口文档,之后大家根据这个接口文档进行开发、维护。为了便于编写和维护稳定&a…...

TensorFlow系列:第四讲:MobileNetV2实战

一. 加载数据集 编写工具类,实现数据集的加载 import keras""" 加载数据集工具类 """class DatasetLoader:def __init__(self, path_url, image_size(224, 224), batch_size32, class_modecategorical):self.path_url path_urlself…...

Redis+Caffeine 实现两级缓存实战

RedisCaffeine 实现两级缓存 背景 ​ 事情的开始是这样的,前段时间接了个需求,给公司的商城官网提供一个查询预计送达时间的接口。接口很简单,根据请求传的城市仓库发货时间查询快递的预计送达时间。因为商城下单就会调用这个接口&#xff…...

SpringBoot:SpringBoot中如何实现对Http接口进行监控

一、前言 Spring Boot Actuator是Spring Boot提供的一个模块,用于监控和管理Spring Boot应用程序的运行时信息。它提供了一组监控端点(endpoints),用于获取应用程序的健康状态、性能指标、配置信息等,并支持通过 HTTP …...

STM32-I2C硬件外设

本博文建议与我上一篇I2C 通信协议​​​​​​共同理解 合成一套关于I2C软硬件体系 STM32内部集成了硬件I2C收发电路,可以由硬件自动执行时钟生成、起始终止条件生成、应答位收发、数据收发等功能,减轻CPU的负担 特点: 多主机功能&#x…...

暑假第一次作业

第一步:给R1,R2,R3,R4配IP [R1-GigabitEthernet0/0/0]ip address 192.168.1.1 24 [R1-Serial4/0/0]ip address 15.0.0.1 24 [R2-GigabitEthernet0/0/0]ip address 192.168.2.1 24 [R2-Serial4/0/0]ip address 25.0.0.1 24 [R3-GigabitEthernet0/0/0]ip address 192.…...

【算法专题】快速排序

1. 颜色分类 75. 颜色分类 - 力扣(LeetCode) 依据题意,我们需要把只包含0、1、2的数组划分为三个部分,事实上,在我们前面学习过的【算法专题】双指针算法-CSDN博客中,有一道题叫做移动零,题目要…...

debian 12 PXE Server 批量部署系统

pxe server 前言 PXE(Preboot eXecution Environment,预启动执行环境)是一种网络启动协议,允许计算机通过网络启动而不是使用本地硬盘。PXE服务器是实现这一功能的服务器,它提供了启动镜像和引导加载程序,…...

【Pytorch】RNN for Image Classification

文章目录 1 RNN 的定义2 RNN 输入 input, h_03 RNN 输出 output, h_n4 多层5 小试牛刀 学习参考来自 pytorch中nn.RNN()总结RNN for Image Classification(RNN图片分类–MNIST数据集)pytorch使用-nn.RNNBuilding RNNs is Fun with PyTorch and Google Colab 1 RNN 的定义 nn.…...

基于Java的飞机大战游戏的设计与实现论文

点击下载源码 基于Java的飞机大战游戏的设计与实现 摘 要 现如今,随着智能手机的兴起与普及,加上4G(the 4th Generation mobile communication ,第四代移动通信技术)网络的深入,越来越多的IT行业开始向手机…...

初识影刀:EXCEL根据部门筛选低值易耗品

第一次知道这个办公自动化的软件还是在招聘网站上,了解之后发现对于办公中重复性的工作还是挺有帮助的,特别是那些操作非EXCEL的重复性工作,当然用在EXCEL上更加方便,有些操作比写VBA便捷。 下面就是一个了解基本操作后&#xff…...

nginx的四层负载均衡实战

目录 1 环境准备 1.1 mysql 部署 1.2 nginx 部署 1.3 关闭防火墙和selinux 2 nginx配置 2.1 修改nginx主配置文件 2.2 创建stream配置文件 2.3 重启nginx 3 测试四层代理是否轮循成功 3.1 远程链接通过代理服务器访问 3.2 动图演示 4 四层反向代理算法介绍 4.1 轮询&#xff0…...

中职网络安全B模块Cenots6.8数据库

任务环境说明: ✓ 服务器场景:CentOS6.8(开放链接) ✓ 用户名:root;密码:123456 进入虚拟机操作系统:CentOS 6.8,登陆数据库(用户名:root&#x…...

BGP笔记的基本概要

技术背景: 在只有IGP(诸如OSPF、IS-IS、RIP等协议,因为最初是被设计在一个单域中进行一个路由操纵,因此被统一称为Interior Gateway Protocol,内部网关协议)的时代,域间路由无法实现一个全局路由…...

【Redis】复制(Replica)

文章目录 一、复制是什么?二、 基本命令三、 配置(分为配置文件和命令配置)3.1 配置文件3.2 命令配置3.3 嵌套连接3.4 关闭从属关系 四、 复制原理五、 缺点 以下是本篇文章正文内容 一、复制是什么? 主从复制 master&#xff…...

封装了一个仿照抖音效果的iOS评论弹窗

需求背景 开发一个类似抖音评论弹窗交互效果的弹窗,支持滑动消失, 滑动查看评论 效果如下图 思路 创建一个视图,该视图上面放置一个tableView, 该视图上添加一个滑动手势,同时设置代理,实现代理方法 (BOOL)gestur…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...

在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能

下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩

目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)

UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化&#xf…...

ArcGIS Pro制作水平横向图例+多级标注

今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作:ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等(ArcGIS出图图例8大技巧),那这次我们看看ArcGIS Pro如何更加快捷的操作。…...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store: 我们在使用异步的时候理应是要使用中间件的,但是configureStore 已经自动集成了 redux-thunk,注意action里面要返回函数 import { configureS…...

HarmonyOS运动开发:如何用mpchart绘制运动配速图表

##鸿蒙核心技术##运动开发##Sensor Service Kit(传感器服务)# 前言 在运动类应用中,运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据,如配速、距离、卡路里消耗等,用户可以更清晰…...

Git常用命令完全指南:从入门到精通

Git常用命令完全指南:从入门到精通 一、基础配置命令 1. 用户信息配置 # 设置全局用户名 git config --global user.name "你的名字"# 设置全局邮箱 git config --global user.email "你的邮箱example.com"# 查看所有配置 git config --list…...

Bean 作用域有哪些?如何答出技术深度?

导语: Spring 面试绕不开 Bean 的作用域问题,这是面试官考察候选人对 Spring 框架理解深度的常见方式。本文将围绕“Spring 中的 Bean 作用域”展开,结合典型面试题及实战场景,帮你厘清重点,打破模板式回答&#xff0c…...