在FastAPI中实现背景任务可以使用Python的asyncio
库来实现。以下是一个简单的示例代码:
from fastapi import BackgroundTasks, FastAPI
import asyncio
app = FastAPI()
def background_task():
# 模拟一个长时间运行的任务
asyncio.sleep(5)
print("Background task completed")
@app.post("/send-notification/{message}")
async def send_notification(message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(background_task)
return {"message": f"Notification '{message}' sent in the background"}
在上面的示例中,我们定义了一个背景任务background_task
,它模拟了一个长时间运行的任务。然后我们定义了一个路由/send-notification/{message}
,当用户访问这个路由时,会触发发送通知的操作,并将背景任务background_task
添加到BackgroundTasks
中。这样在接收到请求后,就会异步执行这个背景任务,不会阻塞主线程。
请注意,需要在启动应用程序时运行uvicorn
服务器时添加--reload
参数,以便在代码更改时重新加载应用程序。