Python混合如何使用同步和异步函数

发布时间:2023-05-12 15:38:45 作者:iii
来源:亿速云 阅读:124

这篇文章主要介绍“Python混合如何使用同步和异步函数”,在日常操作中,相信很多人在Python混合如何使用同步和异步函数问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Python混合如何使用同步和异步函数”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

在协程函数中调用同步函数

在协程函数中直接调用同步函数会阻塞事件循环,从而影响整个程序的性能。我们先来看一个例子:

以下是使用异步 Web 框架 FastAPI 写的一个例子,FastAPI 是比较快,但不正确的操作将会变得很慢。

import time

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def root():
    time.sleep(10)
    return {"message": "Hello World"}


@app.get("/health")
async def health():
    return {"status": "ok"}

上面我们写了两个接口,假设 root 接口函数耗时 10 秒,在这 10 秒内访问 health 接口,想一想会发生什么?

Python混合如何使用同步和异步函数

访问 root 接口(左),立即访问 health 接口(右),health 接口被阻塞,直至 root 接口返回后,health 接口才成功响应。

time.sleep 就是一个「同步」函数,它会阻塞整个事件循环。

如何解决呢?想一想以前的处理方法,如果一个函数会阻塞主线程,那么就再开一个线程让这个阻塞函数单独运行。所以,这里也是同理,开一个线程单独去运行那些阻塞式操作,比如读取文件等。

loop.run_in_executor 方法将同步函数转换为异步非阻塞方式进行处理。具体来说,loop.run_in_executor() 可以将同步函数创建为一个线程进程,并在其中执行该函数,从而避免阻塞事件循环。

官方例子:在线程或者进程池中执行代码。

那么,我们使用 loop.run_in_executor 改写上面例子,如下:

import asyncio
import time

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def root():
    loop = asyncio.get_event_loop()

    def do_blocking_work():
        time.sleep(10)
        print("Done blocking work!!")

    await loop.run_in_executor(None, do_blocking_work)
    return {"message": "Hello World"}


@app.get("/health")
async def health():
    return {"status": "ok"}

效果如下:

Python混合如何使用同步和异步函数

root 接口被阻塞期间,health 依然正常访问互不影响。

注意: 这里都是为了演示,实际在使用 FastAPI 开发时,你可以直接将 async def root 更换成 def root ,也就是将其换成同步接口函数,FastAPI 内部会自动创建线程处理这个同步接口函数。总的来说,FastAPI 内部也是依靠线程去处理同步函数从而避免阻塞主线程(或主线程中的事件循环)。

在同步函数中调用异步函数

协程只能在「事件循环」内被执行,且同一时刻只能有一个协程被执行。

所以,在同步函数中调用异步函数,其本质就是将协程「扔进」事件循环中,等待该协程执行完获取结果即可。

以下这些函数,都可以实现这个效果:

接下来,我们将一一讲解这些方法并举例说明。

asyncio.run

这个方法使用起来最简单,先看下如何使用,然后紧跟着讲一下哪些场景不能直接使用 asyncio.run

import asyncio

async def do_work():
    return 1

def main():
    result = asyncio.run(do_work())
    print(result)  # 1

if __name__ == "__main__":
    main()

直接 run 就完事了,然后接受返回值即可。

但是需要,注意的是 asyncio.run 每次调用都会新开一个事件循环,当结束时自动关闭该事件循环。

一个线程内只存在一个事件循环,所以如果当前线程已经有存在的事件循环了,就不应该使用 asyncio.run 了,否则就会抛出如下异常:

RuntimeError: asyncio.run() cannot be called from a running event loop

因此,asyncio.run 用作新开一个事件循环时使用。

asyncio.run_coroutine_threadsafe

向指定事件循环提交一个协程。(线程安全)
返回一个 concurrent.futures.Future 以等待来自其他 OS 线程的结果。

换句话说,就是将协程丢给其他线程中的事件循环去运行

值得注意的是这里的「事件循环」应该是其他线程中的事件循环,非当前线程的事件循环。

其返回的结果是一个 future 对象,如果你需要获取协程的执行结果可以使用 future.result() 获取

下方给了一个例子,一共有两个线程:thread_with_loopanother_thread,分别用于启动事件循环和调用 run_coroutine_threadsafe

import asyncio
import threading
import time

loop = None


def get_loop():
    global loop
    if loop is None:
        loop = asyncio.new_event_loop()
    return loop


def another_thread():
    async def coro_func():
        return 1

    loop = get_loop()
    # 将协程提交到另一个线程的事件循环中执行
    future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
    # 等待协程执行结果
    print(future.result())
    # 停止事件循环
    loop.call_soon_threadsafe(loop.stop)


def thread_with_loop():
    loop = get_loop()
    # 启动事件循环,确保事件循环不会退出,直到 loop.stop() 被调用
    loop.run_forever()
    loop.close()


# 启动一个线程,线程内部启动了一个事件循环
threading.Thread(target=thread_with_loop).start()
time.sleep(1)
# 在主线程中启动一个协程, 并将协程提交到另一个线程的事件循环中执行
t = threading.Thread(target=another_thread)
t.start()
t.join()
loop.run_until_complete

运行直到 future ( Future 的实例 ) 被完成。

这个方法和 asyncio.run 类似。

具体就是传入一个协程对象或者任务,然后可以直接拿到协程的返回值。

run_until_complete 属于 loop 对象的方法,所以这个方法的使用前提是有一个事件循环,注意这个事件循环必须是非运行状态,如果是运行中就会抛出如下异常:

RuntimeError: This event loop is already running

例子:

loop = asyncio.new_event_loop()
loop.run_until_complete(do_async_work())
create_task

再次准确一点:要运行一个协程函数的本质是将携带协程函数的任务提交至事件循环中,由事件循环发现、调度并执行。

其实一共就是满足两个条件:

我们使用 async def func 定义的函数叫做协程函数func() 这样调用之后返回的结果是协程对象,到这一步协程函数内的代码都没有被执行,直到协程对象被包装成了任务,事件循环才会“正眼看它们”。

所以事件循环调度运行的基本单元就是任务,那为什么我们在使用 async/await 这些语句时没有涉及到任务这个概念呢?

这是因为 await 语法糖在内部将协程对象封装成了任务,再次强调事件循环只认识任务

所以,想要运行一个协程对象,其实就是将协程对象封装成一个任务,至于事件循环是如何发现、调度和执行的,这个我们不用关心。

那将协程封装成的任务的方法有哪些呢?

看着有好几个的,没关系,我们只关心 loop.create_task,因为其他方法最终都是调用 loop.create_task

使用起来也是很简单的,将协程对象传入,返回值是一个任务对象。

async def do_work():
    return 222

task = loop.create_task(do_work())

do_work 会被异步执行,那么 do_work 的结果怎么获取呢,task.result() 可以吗?

分情况:

asyncio.Task 运行使用 add_done_callback 添加完成时的回调函数,所以我们可以「曲线救国」,使用回调函数将结果添加到队列、Future 等等。

我这里给个基于 concurrent.futures.Future 获取结果的例子,如下:

import asyncio
from asyncio import Task
from concurrent.futures import Future

from fastapi import FastAPI

app = FastAPI()
loop = asyncio.get_event_loop()


async def do_work1():
    return 222


@app.get("/")
def root():
    # 新建一个 future 对象,用于接受结果值
    future = Future()

    # 提交任务至事件循环
    task = loop.create_task(do_work1())

    # 回调函数
    def done_callback(task: Task):
        # 设置结果
        future.set_result(task.result())

    # 为这个任务添加回调函数
    task.add_done_callback(done_callback)

    # future.result 会被阻塞,直到有结果返回为止
    return future.result()  # 222

到此,关于“Python混合如何使用同步和异步函数”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

推荐阅读:
  1. 如何使用Python日期库pendulum来处理日期和时间
  2. 怎么查看Python脚本所依赖的第三方库及其版本

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

python

上一篇:python如何实现简易的学生信息管理系统

下一篇:Python迭代器如何创建使用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》