在FastAPI中实现背景任务可以使用Python的asyncio库来实现。以下是一个简单的示例代码:
from fastapi import BackgroundTasks, FastAPI
import asyncio
app = FastAPI()
def background_task():
# 模拟一个长时间运行的任务
asyncio.sleep(5)
print("Background task completed")
@app.post("/send-notification/{message}")
async def send_notification(message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(background_task)
return {"message": f"Notification '{message}' sent in the background"}
在上面的示例中,我们定义了一个背景任务background_task,它模拟了一个长时间运行的任务。然后我们定义了一个路由/send-notification/{message},当用户访问这个路由时,会触发发送通知的操作,并将背景任务background_task添加到BackgroundTasks中。这样在接收到请求后,就会异步执行这个背景任务,不会阻塞主线程。
请注意,需要在启动应用程序时运行uvicorn服务器时添加--reload参数,以便在代码更改时重新加载应用程序。