前言

在现代编程中,异步编程已成为提高程序效率和性能的重要方式。

Python 作为一种流行的编程语言,自然也提供了强大的异步编程支持。

本文将详细介绍 Python 中的协程,以及 async def、async for、await 和 yield 等关键字的使用。


协程简介

协程是一种比传统函数更高级的控制结构。

它们在一个过程中暂停,然后在另一个地方恢复执行

协程可以在程序的多个点之间切换,从而实现并发执行,而无需多线程或多进程的开销。


协程 vs 线程

与线程不同,协程由程序员手动控制其切换。

线程在操作系统级别进行调度,可能导致频繁的上下文切换开销。

协程则由 Python 解释器调度,开销更低,且不会发生竞争资源的问题。


一、async def 和 await

Python 3.5及之后版本中,引入了 asyncawait 关键字,使得定义和调用协程变得更为简洁和直观。

async def

async def 用于定义一个协程函数。

与普通函数不同,协程函数在调用时不会立即执行,而是返回一个协程对象,直到被 await 调用时才会运行。

import asyncio


async def my_coroutine():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 调用协程函数
coroutine = my_coroutine()

await

await 用于暂停协程的执行,等待另一个协程完成,并获取其结果。

await 后面必须跟随一个可等待对象。

协程、Future对象或其他实现了 __await__方法的对象。

async def main():
    print("Start")
    await my_coroutine()
    print("End")

# 运行主协程
# asyncio.run(main())
  • 在上面的示例中,await my_coroutine() 会暂停 main 的执行,直到 my_coroutine 运行结束。
  • asyncio.run 这个函数是 Python 3.7 之后才有的特性。
  • 可以让 Python 的协程接口变得非常简单,一个好的编程规范是,asyncio.run(main()) 作为主程序的入口函数,在程序运行周期内,只调用一次 asyncio.run

二、async for 和 async with

Python 3.6 引入了 async for 和 async with,使得异步迭代和上下文管理变得更加方便。

async for

async for 用于异步迭代可等待对象的异步迭代器。

它的工作方式类似于普通的 for 循环,但可以在异步环境中使用。

class AsyncIterator:
    def __init__(self):
        self.count = 0


    async def __aiter__(self):
        return self


    async def __anext__(self):
        if self.count < 5:
            self.count += 1
            return self.count
        else:
            raise StopAsyncIteration


async def async_for_example():
    async for number in AsyncIterator():
        print(number)


asyncio.run(async_for_example())

async with

async with 用于异步上下文管理器。

它的作用与 with 语句类似,但适用于异步环境,确保在异步操作前后执行特定的设置和清理操作。

class AsyncContextManager:
    async def __aenter__(self):
        print("Enter context")
        return self


    async def __aexit__(self, exc_type, exc, tb):
        print("Exit context")


async def async_with_example():
    async with AsyncContextManager() as manager:
        print("Inside context")


asyncio.run(async_with_example())

三、yield 和 yield from

yield 和 yield from 是生成器相关的关键字。

但它们也可以用于协程中,尤其是在生成器协程(Python 3.3之前的异步实现)中。

yield

yield用于定义生成器函数。

生成器函数在每次 yield 语句处暂停,并在下次调用 next() 方法时继续执行。

def simple_generator():
    yield 1
    yield 2
    yield 3


for value in simple_generator():
    print(value)

yield from

yield from 用于委派生成器,允许一个生成器将部分操作委托给另一个生成器。

def generator1():
    yield 1
    yield 2


def generator2():
    yield from generator1()
    yield 3


for value in generator2():
    print(value)

在异步编程中,yieldyield from 也可以用于异步生成器和异步迭代器。


四、create_task 和 gather

asyncio.create_task 和 asyncio.gather 是两个重要的工具,用于并发运行多个协程。

asyncio.create_task

asyncio.create_task 用于将协程包装成任务,使其能够在事件循环中并发运行。

import asyncio

async def task1():
    await asyncio.sleep(1)
    print("Task 1 completed")

async def task2():
    await asyncio.sleep(2)
    print("Task 2 completed")

async def main():
    task1_task = asyncio.create_task(task1())
    task2_task = asyncio.create_task(task2())
    
    # 等待所有任务完成
    await task1_task
    await task2_task

asyncio.run(main())

在这个示例中,我们创建了两个任务 task1task2
并通过 asyncio.create_task 将它们包装成可并发运行的任务。
然后,我们使用 await 等待所有任务完成。

asyncio.gather

asyncio.gather 用于并行运行多个协程,并收集它们的结果。

它比 create_task 更加方便,尤其是当我们需要同时运行多个任务并获取它们的结果时。

import asyncio

async def task1():
    await asyncio.sleep(1)
    print("Task 1 completed")
    return "Result 1"

async def task2():
    await asyncio.sleep(2)
    print("Task 2 completed")
    return "Result 2"

async def main():
    results = await asyncio.gather(task1(), task2())
    print(results)

asyncio.run(main())

在这个示例中,asyncio.gather 并行运行 task1task2
并在所有任务完成后返回一个包含结果的列表,这样我们可以更方便地管理和处理多个协程任务。

异常处理与取消任务

在实际应用中,协程可能会抛出异常,或者需要在执行过程中取消某些任务。

我们可以通过 asyncio.gatherreturn_exceptions 参数来收集异常,同时也可以使用 cancel 方法来取消任务。

import asyncio

async def worker_1():
    await asyncio.sleep(1)
    return 1

async def worker_2():
    await asyncio.sleep(2)
    return 2 / 0

async def worker_3():
    await asyncio.sleep(3)
    return 3

async def main():
    task_1 = asyncio.create_task(worker_1())
    task_2 = asyncio.create_task(worker_2())
    task_3 = asyncio.create_task(worker_3())

    await asyncio.sleep(2)
    task_3.cancel()

    res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
    print(res)

# 输出: [1, ZeroDivisionError('division by zero'), CancelledError()]
asyncio.run(main())
  • 在这个示例中,我们创建了三个任务 worker_1、worker_2 和 worker_3
  • 其中 worker_2 会抛出一个除零异常,而 worker_3 会在执行过程中被取消。
  • 我们使用 asyncio.gather 来收集所有任务的结果,并通过 return_exceptions=True 参数捕获所有异常。
  • 最终的输出包含了正常完成任务的结果、抛出的异常以及取消任务的状态。

通过 asyncio.create_taskasyncio.gather,我们可以有效地并行运行多个协程任务,极大地提高程序的并发性能。
这在处理大量I/O操作或需要同时执行多个独立任务的场景中尤为重要。


五、并发度控制asyncio.Semaphore

asyncio.SemaphorePythonasyncio 模块中的一个重要工具,用于控制并发任务的数量。

它在处理大量并发操作时尤为重要,尤其是在需要限制同时运行的任务数量以避免过载或超出限制的场景中。

asyncio.Semaphore 是一种异步互斥量,允许在同一时间内有固定数量的任务访问某个资源。

它可以帮助你在异步编程中控制并发级别,防止系统过载或超出外部服务的限制。

工作原理

1. 初始化:

semaphore = asyncio.Semaphore(value)
  • value 表示信号量的初始值,也即允许同时运行的任务数量。
  • 默认值是 1,表示互斥量(类似于锁)。

2. 获取信号量:

async with semaphore:
       # 受控的代码块
  • async with semaphore 是异步上下文管理器,获取信号量(即允许继续执行的许可证)并进入受控的代码块。
  • 当代码块执行完毕时,信号量会自动释放,使其他任务能够继续执行。

3. 释放信号量:

  • async with 语句块结束时,信号量会自动释放。
  • 这确保了每个获取信号量的操作都有一个匹配的释放操作。
使用场景
  • 控制并发任务数量:

使用 asyncio.Semaphore 来限制同时进行的任务数量。例如,当处理大量网络请求时,控制并发度可以防止超出 API 的速率限制或避免过载。

  • 避免资源争用:

当多个任务访问共享资源时,信号量可以确保资源访问的有序性和一致性,避免资源争用问题。

示例

以下是一个简单的示例,展示了如何使用 asyncio.Semaphore 来限制同时运行的任务数量:

import asyncio

async def worker(semaphore, worker_id):
    async with semaphore:
        print(f"Worker {worker_id} is working")
        await asyncio.sleep(1)
        print(f"Worker {worker_id} has finished")

async def main():
    semaphore = asyncio.Semaphore(3)  # Limit concurrency to 3
    tasks = [worker(semaphore, i) for i in range(10)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

在这个例子中,Semaphore(3) 限制了最多同时运行 3worker 任务。

当有更多任务时,它们必须等待直到有信号量可用。

因此上述代码的运行日志为:

Worker 0 is working
Worker 1 is working
Worker 2 is working
Worker 0 has finished
Worker 1 has finished
Worker 2 has finished
Worker 3 is working
Worker 4 is working
Worker 5 is working
Worker 3 has finished
Worker 4 has finished
Worker 5 has finished
Worker 6 is working
Worker 7 is working
Worker 8 is working
Worker 6 has finished
Worker 7 has finished
Worker 8 has finished
Worker 9 is working
Worker 9 has finished

总结

asyncio.Semaphore 是控制异步操作并发度的一个强大工具,它能够有效管理任务并发,避免超载和资源争用。

理解和正确使用信号量可以帮助你在异步编程中实现更高效、更可靠的代码。


六、协程与生成器的关系

协程与生成器有很多相似之处,都能够在函数执行过程中暂停并恢复,但它们的设计目的和使用场景有所不同。

相似之处

暂停与恢复:两者都可以在执行过程中暂停,并在之后恢复。

关键字:协程使用 await 暂停执行,生成器使用 yield 暂停执行。

不同之处

生成器:主要用于生成一系列值,常用于迭代。

协程:主要用于处理异步操作,管理并发任务。

生成器:使用 yield 关键字。

协程:使用 async def 定义,await 关键字用于暂停。

控制流:

生成器:由调用方(迭代器)控制。

协程:由事件循环控制。

结合使用

在某些情况下,可以结合使用生成器和协程。

例如,在异步生成器中使用 yield 生成值,并使用 await 等待异步操作完成。

async def async_generator():
    for i in range(5):
        await asyncio.sleep(1)
        yield i

async def main():
    async for value in async_generator():
        print(value)

asyncio.run(main())

在这个示例中,我们定义了一个异步生成器函数 async_generator,它每秒生成一个值,并在主协程中异步迭代这些值。


七、实际应用场景

异步编程在实际中有广泛的应用,尤其是在处理I/O密集型任务时,如网络请求、文件操作等。

通过异步编程,可以在等待I/O操作时执行其他任务,从而提高程序的并发性能。

异步网络请求

import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://example.com')
        print(html)


asyncio.run(main())

在这个示例中,我们使用 aiohttp 库进行异步网络请求,大大提高了效率。

异步文件操作

import aiofiles


async def read_file(filename):
    async with aiofiles.open(filename, 'r') as f:
        contents = await f.read()
        print(contents)


asyncio.run(read_file('example.txt'))

通过 aiofiles 库,我们可以实现异步的文件读写操作,提高文件I/O操作的性能。

异步大模型流式服务

大模型(如GPT-4)相关的应用中,流式服务是一种常见的需求。
通过异步编程,可以实现高效的流式数据处理,提高服务响应速度。

import asyncio


async def stream_handler(reader, writer):
    while True:
        data = await reader.read(100)
        if not data:
            break
        print(f"Received: {data.decode()}")
        response = f"Echo: {data.decode()}"
        writer.write(response.encode())
        await writer.drain()
    writer.close()
    await writer.wait_closed()


async def main():
    server = await asyncio.start_server(stream_handler, '127.0.0.1', 8888)
    async with server:
        await server.serve_forever()


asyncio.run(main())

在这个示例中,我们使用 asyncio 库创建了一个简单的流式服务。

客户端发送的数据会被接收并立即返回给客户端,实现了基本的流式处理功能。


八、总结

Python的协程和异步编程为开发高效的并发程序提供了强大的工具。

通过 async def、await、async for 和 async with 等关键字,我们可以编写简洁、易读的异步代码。

理解和熟练应用这些关键字,将大大提高你的编程效率和程序性能。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐