后端任务调度:full-stack-fastapi-template中的Celery集成

【免费下载链接】full-stack-fastapi-template 【免费下载链接】full-stack-fastapi-template 项目地址: https://gitcode.com/gh_mirrors/fu/full-stack-fastapi-template

在现代Web应用开发中,后端任务调度是处理异步任务、定时任务的关键组件。full-stack-fastapi-template作为一个全栈FastAPI模板,虽然原生未直接集成Celery,但本文将详细介绍如何在该项目中集成Celery实现高效的任务调度。

项目结构概览

full-stack-fastapi-template采用前后端分离架构,主要分为以下几个核心目录:

项目整体架构如图所示:

项目架构

Celery集成准备工作

环境依赖安装

首先需要在后端项目中添加Celery及相关依赖。编辑backend/pyproject.toml文件,添加以下依赖:

celery = "^5.3.4"
redis = "^4.5.5"
flower = "^2.0.1"

然后通过Poetry安装依赖:

cd backend && poetry add celery redis flower

目录结构调整

为Celery集成创建必要的目录结构:

backend/
├── app/
│   ├── tasks/           # 任务定义目录
│   │   ├── __init__.py
│   │   └── example.py   # 示例任务
│   └── core/
│       └── celery.py    # Celery配置

Celery核心配置实现

创建Celery实例

在backend/app/core/celery.py中添加以下配置:

from celery import Celery
from app.core.config import settings

celery_app = Celery(
    "worker",
    backend=settings.REDIS_URL,
    broker=settings.REDIS_URL,
    include=["app.tasks.example"]
)

celery_app.conf.update(
    task_track_started=True,
    task_time_limit=30 * 60,  # 30分钟超时
    result_expires=24 * 60 * 60,  # 结果保留24小时
)

环境变量配置

修改backend/app/core/config.py,添加Redis配置:

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    # ... 其他配置 ...
    REDIS_URL: str = "redis://redis:6379/0"

任务定义与使用示例

创建示例任务

在backend/app/tasks/example.py中定义任务:

from app.core.celery import celery_app
import time

@celery_app.task
def process_data(data: dict) -> dict:
    """处理数据的示例任务"""
    time.sleep(10)  # 模拟耗时操作
    result = {"status": "completed", "data": data, "processed_at": time.time()}
    return result

@celery_app.task(bind=True, max_retries=3)
def send_notification(self, email: str, message: str):
    """发送通知的任务,支持重试"""
    try:
        # 实际发送邮件的逻辑
        from app.utils import send_email
        send_email(email, "Notification", message)
        return {"status": "success", "email": email}
    except Exception as e:
        self.retry(exc=e, countdown=5)  # 5秒后重试

在API中调用任务

修改backend/app/api/routes/items.py,添加任务调用接口:

from fastapi import APIRouter, Depends, HTTPException
from app.tasks.example import process_data, send_notification
from app.schemas.item import ItemCreate, ItemResponse

router = APIRouter()

@router.post("/process", response_model=ItemResponse)
def create_process_task(item: ItemCreate):
    """创建数据处理任务"""
    task = process_data.delay(item.dict())
    return {
        "task_id": task.id,
        "status": "pending",
        "message": "任务已提交"
    }

任务监控与管理

Docker配置

修改docker-compose.yml,添加Redis、Celery Worker和Flower服务:

services:
  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"

  celery_worker:
    build: ./backend
    command: poetry run celery -A app.core.celery worker --loglevel=info
    depends_on:
      - backend
      - redis

  celery_flower:
    build: ./backend
    command: poetry run celery -A app.core.celery flower --port=5555
    ports:
      - "5555:5555"
    depends_on:
      - backend
      - redis

启动与监控

启动所有服务:

docker-compose up -d

访问Flower监控界面:http://localhost:5555

Celery监控界面

最佳实践与注意事项

  1. 任务设计原则

    • 保持任务幂等性,确保重复执行安全
    • 任务参数尽量简单,避免传递大型对象
    • 长时间运行的任务应拆分为多个小任务
  2. 错误处理策略

    @celery_app.task(bind=True, max_retries=3, retry_backoff=2)
    def robust_task(self, param):
        try:
            # 业务逻辑
        except Exception as e:
            self.retry(exc=e, countdown=10)  # 指数退避重试
    
  3. 性能优化

    • 使用任务路由将不同类型任务分配给专用worker
    • 合理设置预取计数(prefetch_count)
    • 定期清理过期任务结果

总结与扩展

通过本文介绍的方法,我们成功在full-stack-fastapi-template中集成了Celery任务调度系统。这一集成方案可以处理各种异步任务需求,包括:

完整的项目代码结构可参考项目根目录,更多开发细节请查阅development.md

对于高级应用场景,可以进一步扩展:

  • 实现任务优先级队列
  • 集成任务进度跟踪
  • 添加任务结果缓存
  • 实现分布式任务处理

【免费下载链接】full-stack-fastapi-template 【免费下载链接】full-stack-fastapi-template 项目地址: https://gitcode.com/gh_mirrors/fu/full-stack-fastapi-template

Logo

加入社区!打开量化的大门,首批课程上线啦!

更多推荐