后端任务调度:full-stack-fastapi-template中的Celery集成
在现代Web应用开发中,后端任务调度是处理异步任务、定时任务的关键组件。full-stack-fastapi-template作为一个全栈FastAPI模板,虽然原生未直接集成Celery,但本文将详细介绍如何在该项目中集成Celery实现高效的任务调度。## 项目结构概览full-stack-fastapi-template采用前后端分离架构,主要分为以下几个核心目录:- **后端代...
后端任务调度:full-stack-fastapi-template中的Celery集成
在现代Web应用开发中,后端任务调度是处理异步任务、定时任务的关键组件。full-stack-fastapi-template作为一个全栈FastAPI模板,虽然原生未直接集成Celery,但本文将详细介绍如何在该项目中集成Celery实现高效的任务调度。
项目结构概览
full-stack-fastapi-template采用前后端分离架构,主要分为以下几个核心目录:
- 后端代码:backend/
- 前端代码:frontend/
- 部署配置:docker-compose.yml
- 开发文档:development.md
项目整体架构如图所示:
Celery集成准备工作
环境依赖安装
首先需要在后端项目中添加Celery及相关依赖。编辑backend/pyproject.toml文件,添加以下依赖:
celery = "^5.3.4"
redis = "^4.5.5"
flower = "^2.0.1"
然后通过Poetry安装依赖:
cd backend && poetry add celery redis flower
目录结构调整
为Celery集成创建必要的目录结构:
backend/
├── app/
│ ├── tasks/ # 任务定义目录
│ │ ├── __init__.py
│ │ └── example.py # 示例任务
│ └── core/
│ └── celery.py # Celery配置
Celery核心配置实现
创建Celery实例
在backend/app/core/celery.py中添加以下配置:
from celery import Celery
from app.core.config import settings
celery_app = Celery(
"worker",
backend=settings.REDIS_URL,
broker=settings.REDIS_URL,
include=["app.tasks.example"]
)
celery_app.conf.update(
task_track_started=True,
task_time_limit=30 * 60, # 30分钟超时
result_expires=24 * 60 * 60, # 结果保留24小时
)
环境变量配置
修改backend/app/core/config.py,添加Redis配置:
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# ... 其他配置 ...
REDIS_URL: str = "redis://redis:6379/0"
任务定义与使用示例
创建示例任务
在backend/app/tasks/example.py中定义任务:
from app.core.celery import celery_app
import time
@celery_app.task
def process_data(data: dict) -> dict:
"""处理数据的示例任务"""
time.sleep(10) # 模拟耗时操作
result = {"status": "completed", "data": data, "processed_at": time.time()}
return result
@celery_app.task(bind=True, max_retries=3)
def send_notification(self, email: str, message: str):
"""发送通知的任务,支持重试"""
try:
# 实际发送邮件的逻辑
from app.utils import send_email
send_email(email, "Notification", message)
return {"status": "success", "email": email}
except Exception as e:
self.retry(exc=e, countdown=5) # 5秒后重试
在API中调用任务
修改backend/app/api/routes/items.py,添加任务调用接口:
from fastapi import APIRouter, Depends, HTTPException
from app.tasks.example import process_data, send_notification
from app.schemas.item import ItemCreate, ItemResponse
router = APIRouter()
@router.post("/process", response_model=ItemResponse)
def create_process_task(item: ItemCreate):
"""创建数据处理任务"""
task = process_data.delay(item.dict())
return {
"task_id": task.id,
"status": "pending",
"message": "任务已提交"
}
任务监控与管理
Docker配置
修改docker-compose.yml,添加Redis、Celery Worker和Flower服务:
services:
redis:
image: redis:6-alpine
ports:
- "6379:6379"
celery_worker:
build: ./backend
command: poetry run celery -A app.core.celery worker --loglevel=info
depends_on:
- backend
- redis
celery_flower:
build: ./backend
command: poetry run celery -A app.core.celery flower --port=5555
ports:
- "5555:5555"
depends_on:
- backend
- redis
启动与监控
启动所有服务:
docker-compose up -d
访问Flower监控界面:http://localhost:5555
最佳实践与注意事项
-
任务设计原则
- 保持任务幂等性,确保重复执行安全
- 任务参数尽量简单,避免传递大型对象
- 长时间运行的任务应拆分为多个小任务
-
错误处理策略
@celery_app.task(bind=True, max_retries=3, retry_backoff=2) def robust_task(self, param): try: # 业务逻辑 except Exception as e: self.retry(exc=e, countdown=10) # 指数退避重试 -
性能优化
- 使用任务路由将不同类型任务分配给专用worker
- 合理设置预取计数(prefetch_count)
- 定期清理过期任务结果
总结与扩展
通过本文介绍的方法,我们成功在full-stack-fastapi-template中集成了Celery任务调度系统。这一集成方案可以处理各种异步任务需求,包括:
- 邮件发送(backend/app/email-templates/)
- 数据处理和报表生成
- 定时任务和周期性作业
- 第三方API调用
完整的项目代码结构可参考项目根目录,更多开发细节请查阅development.md。
对于高级应用场景,可以进一步扩展:
- 实现任务优先级队列
- 集成任务进度跟踪
- 添加任务结果缓存
- 实现分布式任务处理
更多推荐



所有评论(0)