Python实现异步编程的重要方式【协程(Coroutine)函数】(内含详细案例)
协程(Coroutine)是Python中实现异步编程的重要方式,它比线程更轻量级,可以在单线程内实现并发操作。在 Python 中,协程(Coroutine)是一种特殊的函数,它允许在执行过程中暂停和恢复。协程是异步编程的重要组成部分,尤其是在处理 I/O 密集型任务时,能够显著提高程序的效率。Python 的 asyncio
模块为协程提供了强大的支持。下面我将通过详细案例讲解Python协程函数的使用。
一、协程基础概念
协程是一种用户态的轻量级线程,由用户控制调度。与线程不同,协程的切换不需要操作系统介入,因此开销更小。
1. 协程与普通函数的区别
-
普通函数:调用时进入,返回时退出,一次性执行完毕
-
协程函数:可以暂停执行,保留状态,后续可以从暂停处恢复执行
二、协程的实现方式
Python中实现协程主要有三种方式:
-
生成器实现的协程(Python 2.5+)
-
asyncio库实现的协程(Python 3.4+)
-
async/await语法实现的协程(Python 3.5+)
我们主要讲解现代Python(3.5+)推荐的async/await方式。
三、async/await基础语法
协程的基本概念
- 定义协程函数:使用
async def
关键字定义协程函数。async def my_coroutine():print("Hello from coroutine")
- 调用协程函数:协程本身不会自动执行,需要显式调用(例如通过事件循环)或通过
await
来调度。直接调用协程函数不会执行它,而是返回一个协程对象:coro = my_coroutine() # 此时不会执行
- 运行协程:要运行协程,需要事件循环,asyncio库的核心,负责调度和执行协程:
Python 3.7+可以简化为:import asyncioasync def my_coroutine():print("Hello from coroutine")# 获取事件循环 loop = asyncio.get_event_loop() # 运行协程直到完成 loop.run_until_complete(my_coroutine()) loop.close()
输出:asyncio.run(my_coroutine())
Hello from coroutine
-
暂停与恢复:协程可以在
await
表达式处暂停,等待某个异步操作完成,然后恢复执行。
四、协程案例讲解
示例1:简单的协程
以下是一个简单的协程示例,展示了如何定义、启动和使用协程:
import asyncio# 定义一个协程函数
async def say_hello(name, delay):print(f"Hello, {name}! Waiting for {delay} seconds...")await asyncio.sleep(delay) # 模拟异步操作,协程在此处暂停print(f"Hello again, {name}!")# 主协程,用于调度其他协程
async def main():# 将协程包装为任务并调度执行task1 = asyncio.create_task(say_hello("Alice", 2))task2 = asyncio.create_task(say_hello("Bob", 1))# 等待所有任务完成await task1await task2# 启动事件循环并运行main协程
asyncio.run(main())
Hello, Alice! Waiting for 2 seconds...
Hello, Bob! Waiting for 1 seconds...
Hello again, Bob!
Hello again, Alice!
案例2:基本协程使用
import asyncioasync def my_coroutine(name, delay):await asyncio.sleep(delay) # 模拟IO操作print(f"Hello, {name}!")async def main():print("Starting...")await my_coroutine("Alice", 1) # 等待1秒await my_coroutine("Bob", 2) # 再等待2秒print("Finished!")asyncio.run(main())
输出:
Starting...
Hello, Alice! # 1秒后
Hello, Bob! # 再过2秒后(总共3秒)
Finished!
案例3:并发执行多个协程
使用asyncio.gather()
可以并发运行多个协程,协程的执行顺序取决于它们的耗时:
import asyncioasync def fetch_data(name, delay):print(f"Fetching data {name} started")await asyncio.sleep(delay) # 模拟网络请求print(f"Data {name} received!")return f"data-{name}"async def main():# 使用gather并发执行多个协程results = await asyncio.gather(fetch_data('A', 2),fetch_data('B', 1),fetch_data('C', 3))print(f"All done! Results: {results}")asyncio.run(main())
输出:
Fetching data A started
Fetching data B started
Fetching data C started
Data B received! # 1秒后
Data A received! # 2秒后
Data C received! # 3秒后
All done! Results: ['data-A', 'data-B', 'data-C']
(注意:总共耗时约3秒(最长的任务),而不是6秒(2+1+3)。)
案例4:协程与任务(Task)
import asyncioasync def worker(name, queue):while True:delay = await queue.get()print(f"{name} starting task with delay {delay}")await asyncio.sleep(delay)print(f"{name} finished task with delay {delay}")queue.task_done()async def main():queue = asyncio.Queue()# 放入一些任务for delay in [1, 2, 3, 1, 2]:await queue.put(delay)# 创建3个worker任务tasks = []for i in range(3):task = asyncio.create_task(worker(f"Worker-{i}", queue))tasks.append(task)# 等待队列清空await queue.join()# 取消worker任务for task in tasks:task.cancel()# 等待所有worker任务被取消await asyncio.gather(*tasks, return_exceptions=True)asyncio.run(main())
输出:
Worker-0 starting task with delay 1
Worker-1 starting task with delay 2
Worker-2 starting task with delay 3
Worker-0 finished task with delay 1
Worker-0 starting task with delay 1
Worker-1 finished task with delay 2
Worker-1 starting task with delay 2
Worker-0 finished task with delay 1
Worker-1 finished task with delay 2
Worker-2 finished task with delay 3
案例5:超时处理
import asyncioasync def slow_operation():print("Starting slow operation")await asyncio.sleep(5) # 模拟耗时操作print("Slow operation completed")return "Result"async def main():try:# 设置3秒超时result = await asyncio.wait_for(slow_operation(), timeout=3)print(f"Got result: {result}")except asyncio.TimeoutError:print("Operation timed out!")asyncio.run(main())
输出:
Starting slow operation
Operation timed out! # 3秒后
# 不会看到"Slow operation completed"
案例6:实际HTTP请求示例
import asyncio
import aiohttp # 需要安装: pip install aiohttp
from aiohttp import TCPConnectorasync def fetch_url(session, url):print(f"Fetching {url}")async with session.get(url) as response:return await response.text()async def main():urls = ["https://www.baidu.com","https://www.python.org","https://cn.bing.com/"]#解决aiohttp证书出错的问题:证书设置为False即可解决aiohttp.ClientSession(connector=TCPConnector(ssl=False))async with aiohttp.ClientSession(connector=TCPConnector(ssl=False)) as session:tasks = [fetch_url(session, url) for url in urls]results = await asyncio.gather(*tasks)for url, content in zip(urls, results):print(f"{url} returned {len(content)} bytes")asyncio.run(main())
输出:
Fetching https://www.baidu.com
Fetching https://www.python.org
Fetching https://cn.bing.com/
https://www.baidu.com returned 28918 bytes
https://www.python.org returned 50832 bytes
https://cn.bing.com/ returned 180243 bytes
五、协程原理与注意事项
1. 协程工作原理
-
协程通过事件循环(Event Loop)实现并发
-
遇到
await
表达式时,协程暂停并将控制权交还给事件循环 -
事件循环调度其他就绪的协程运行
-
当
await
的操作完成时,协程从暂停处恢复执行
2. 注意事项
-
不要在协程中使用阻塞IO:如
time.sleep()
会阻塞整个线程 -
协程需要被await:忘记await会导致协程不执行
-
合理控制并发量:过多并发可能导致资源耗尽
-
错误处理:协程中的异常需要通过
try/except
捕获
3. 协程适用场景
-
高并发的网络IO操作
-
需要处理大量连接的服务器
-
需要并发执行但线程开销太大的场景
-
需要精细控制执行流程的异步任务
六、总结
Python的协程通过async/await语法提供了清晰的异步编程模型。关键点:
-
使用
async def
定义协程函数 -
使用
await
暂停协程执行,等待异步操作完成 -
使用
asyncio.run()
运行顶层协程 -
使用
asyncio.gather()
并发运行多个协程 -
使用
asyncio.create_task()
创建后台任务
协程是Python异步编程的核心,掌握它可以显著提高IO密集型应用的性能。