当前位置: 首页 > news >正文

Python协程详解:从基础到实战

协程是Python中实现并发编程的重要方式之一,它比线程更轻量级,能够高效处理I/O密集型任务。本文将全面介绍协程的概念、原理、实现方式以及与线程、进程的对比,包含完整的效率对比代码和详细说明,帮助Python开发者深入理解并掌握协程技术。

1. 协程基础概念

1.1 什么是协程

协程(Coroutine)是Python中另外一种实现多任务的方式,它是一种比线程更小的执行单元,占用更少的资源。协程之所以被称为执行单元,是因为它自带CPU上下文。这意味着只要在合适的时机,我们可以把一个协程切换到另一个协程,只要在这个过程中保存或恢复CPU上下文,程序就可以继续运行。

通俗地说:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行(注意不是通过调用函数的方式做到的),并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定。

1.2 为什么需要协程

在传统多线程编程中,我们知道:

  • 进程是资源分配的最小单位

  • 线程是CPU调度的最小单位

虽然多线程已经提高了CPU利用率,但创建和管理线程/进程需要消耗系统资源。随着对效率的追求不断提高,基于单线程实现并发成为一个新的课题,即只用一个主线程(很明显可利用的CPU只有一个)情况下实现并发。这样可以节省创建线程/进程所消耗的时间。

1.3 并发的本质

并发的本质可以概括为:切换+保存状态

CPU正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制):

  1. 该任务发生了阻塞(如I/O操作)

  2. 该任务计算的时间过长(时间片用完)

其中第二种情况并不能提升效率,只是为了让CPU能够"雨露均沾",实现看起来所有任务都被"同时"执行的效果。如果多个任务都是纯计算的,这种切换反而会降低效率。

2. 协程与线程的差异

在实现多任务时,线程切换从系统层面远不止保存和恢复CPU上下文这么简单。操作系统为了程序运行的高效性,每个线程都有自己缓存Cache等数据,操作系统还会帮你做这些数据的恢复操作。所以线程的切换非常耗性能。

相比之下,协程的切换只是单纯地操作CPU的上下文,所以一秒钟切换个上百万次系统都抗得住。具体差异如下:

特性线程协程
调度者操作系统用户程序
切换代价高(涉及内核态切换)低(用户态切换)
内存占用较大(MB级)极小(KB级)
并发数量有限(千级)极高(百万级)
数据同步需要锁机制无需加锁

3. Python协程的特点

  1. 必须在单线程里实现并发:协程的本质是单线程下的并发

  2. 修改共享数据不需加锁:因为是单线程,不存在竞争条件

  3. 用户程序自己保存多个控制流的上下文栈

  4. 自动I/O切换:一个协程遇到I/O操作自动切换到其它协程(需要gevent等模块支持)

4. Python协程的实现方式

4.1 yield模拟协程

Python中的生成器(generator)和yield关键字可以初步模拟协程的行为:

def consumer():while True:x = yield  # 接收send发送的值print(f"处理数据: {x}")def producer():c = consumer()c.__next__()  # 启动生成器for i in range(5):print(f"生产数据: {i}")c.send(i)  # 发送数据给consumerproducer()

这段代码展示了生产者-消费者模型的基本协程实现。生产者通过send()方法将数据发送给消费者,消费者通过yield接收并处理数据。

4.2 Greenlet模块

Greenlet是一个更专业的协程实现,它提供了明确的切换方法:

from greenlet import greenlet
import timedef task1():print("执行任务1")gr2.switch()  # 切换到任务2print("任务1继续执行")gr2.switch()def task2():print("执行任务2")gr1.switch()  # 切换回任务1print("任务2继续执行")gr1 = greenlet(task1)
gr2 = greenlet(task2)
gr1.switch()  # 启动任务1

Greenlet的优点是切换明确,缺点是需要手动切换,无法自动识别I/O阻塞。

4.3 Gevent模块

Gevent是基于Greenlet的更高级协程库,它能够自动处理I/O阻塞:

import gevent
from gevent import monkey
monkey.patch_all()  # 打补丁,替换标准库中的阻塞式I/Odef fetch(url):print(f"获取 {url}")gevent.sleep(2)  # 模拟I/O操作print(f"{url} 返回数据")def async_fetch():urls = ['url1', 'url2', 'url3']jobs = [gevent.spawn(fetch, url) for url in urls]gevent.joinall(jobs)async_fetch()

Gevent的关键点:

  1. monkey.patch_all():替换Python标准库中的阻塞式I/O为Gevent的非阻塞版本

  2. gevent.spawn():创建协程任务

  3. gevent.joinall():等待所有协程完成

5. 协程效率对比测试

5.1 顺序执行 vs 协程执行

下面我们通过一个完整的例子来对比顺序执行和协程执行的效率差异:

from gevent import monkey
import gevent
import time
import requests# 打补丁
monkey.patch_all()# 测试URL
url = "https://www.baidu.com"# 顺序执行
def sequential_fetch():start = time.time()for i in range(5):print(f"顺序请求 {i} 开始")response = requests.get(url)print(f"顺序请求 {i} 完成,状态码: {response.status_code}")print(f"顺序执行总耗时: {time.time() - start:.2f}秒")# 协程执行
def coroutine_fetch():start = time.time()def fetch(i):print(f"协程请求 {i} 开始")response = requests.get(url)print(f"协程请求 {i} 完成,状态码: {response.status_code}")jobs = [gevent.spawn(fetch, i) for i in range(5)]gevent.joinall(jobs)print(f"协程执行总耗时: {time.time() - start:.2f}秒")# 执行测试
print("=== 顺序执行测试 ===")
sequential_fetch()print("\n=== 协程执行测试 ===")
coroutine_fetch()

执行结果分析

  • 顺序执行:每个请求依次执行,总耗时≈各请求耗时之和

  • 协程执行:所有请求并发执行,总耗时≈最慢的单个请求耗时

5.2 线程 vs 协程效率对比

下面我们对比线程和协程在处理I/O密集型任务时的效率:

import threading
import time
import requests
from gevent import monkey
import geventmonkey.patch_all()
url = "https://www.baidu.com"
n = 10  # 并发数量# 线程方式
def thread_fetch():start = time.time()def fetch(i):response = requests.get(url)print(f"线程 {i} 完成")threads = []for i in range(n):t = threading.Thread(target=fetch, args=(i,))t.start()threads.append(t)for t in threads:t.join()print(f"线程方式总耗时: {time.time() - start:.2f}秒")# 协程方式
def gevent_fetch():start = time.time()def fetch(i):response = requests.get(url)print(f"协程 {i} 完成")jobs = [gevent.spawn(fetch, i) for i in range(n)]gevent.joinall(jobs)print(f"协程方式总耗时: {time.time() - start:.2f}秒")# 执行测试
print("=== 线程方式测试 ===")
thread_fetch()print("\n=== 协程方式测试 ===")
gevent_fetch()

执行结果分析

  • 线程方式:创建线程有一定开销,线程切换需要内核参与

  • 协程方式:协程创建和切换开销极小,完全在用户空间完成

5.3 不同并发量下的效率对比

我们测试不同并发量下协程的执行效率:

import time
from gevent import monkey
import gevent
import requestsmonkey.patch_all()def test_concurrency(concurrency):print(f"\n=== 并发数: {concurrency} ===")start = time.time()def task(i):requests.get("https://www.baidu.com")print(f"任务 {i} 完成", end=" | ")jobs = [gevent.spawn(task, i) for i in range(concurrency)]gevent.joinall(jobs)elapsed = time.time() - startprint(f"\n并发数 {concurrency} 总耗时: {elapsed:.2f}秒")return elapsed# 测试不同并发量
concurrency_levels = [10, 50, 100, 200, 500]
results = {}
for level in concurrency_levels:results[level] = test_concurrency(level)# 打印结果对比
print("\n=== 结果汇总 ===")
for level, time_cost in results.items():print(f"并发数 {level}: {time_cost:.2f}秒")

预期结果

  • 小并发量时总耗时接近单个请求耗时

  • 随着并发量增加,总耗时增长缓慢

  • 协程可以轻松支持上千并发

6. 协程的实际应用

6.1 网络爬虫

协程特别适合网络爬虫这种I/O密集型应用:

import gevent
from gevent import monkey
monkey.patch_all()
import requests
from urllib.parse import urljoin
from bs4 import BeautifulSoupbase_url = "https://www.example.com"
visited = set()def crawler(url):if url in visited:returnvisited.add(url)try:print(f"抓取: {url}")response = requests.get(url)soup = BeautifulSoup(response.text, 'html.parser')# 处理页面内容...print(f"从 {url} 找到 {len(soup.find_all('a'))} 个链接")# 发现新链接links = [urljoin(base_url, a['href']) for a in soup.find_all('a', href=True)]# 创建协程抓取新链接jobs = [gevent.spawn(crawler, link) for link in links if link.startswith(base_url)]gevent.joinall(jobs)except Exception as e:print(f"抓取 {url} 出错: {e}")# 开始抓取
crawler(base_url)

6.2 Web服务器

使用协程可以轻松实现高并发的Web服务器:

from gevent.pywsgi import WSGIServer
from gevent import monkey
monkey.patch_all()def application(env, start_response):path = env['PATH_INFO']if path == '/':start_response('200 OK', [('Content-Type', 'text/html')])return [b"<h1>Welcome</h1><p>Hello from Gevent server!</p>"]elif path == '/api/data':# 模拟数据库查询gevent.sleep(1)start_response('200 OK', [('Content-Type', 'application/json')])return [b'{"data": [1, 2, 3]}']else:start_response('404 Not Found', [('Content-Type', 'text/html')])return [b"<h1>404 Not Found</h1>"]if __name__ == '__main__':print("Server running on http://localhost:8000")server = WSGIServer(('0.0.0.0', 8000), application)server.serve_forever()

6.3 数据库操作

协程化的数据库操作可以显著提高并发性能:

import gevent
from gevent import monkey
monkey.patch_all()
import pymysql
from DBUtils.PooledDB import PooledDB# 创建数据库连接池
db_pool = PooledDB(creator=pymysql,host='localhost',user='root',password='123456',db='test',maxconnections=20
)def query_user(user_id):conn = db_pool.connection()try:with conn.cursor() as cursor:sql = "SELECT * FROM users WHERE id = %s"cursor.execute(sql, (user_id,))result = cursor.fetchone()print(f"查询到用户 {user_id}: {result}")return resultfinally:conn.close()# 并发查询多个用户
user_ids = [1, 2, 3, 4, 5]
jobs = [gevent.spawn(query_user, uid) for uid in user_ids]
gevent.joinall(jobs)

7. 协程与多线程、多进程的对比

7.1 完整对比表格

特性进程线程协程
创建开销大(10+MB)中(1MB左右)极小(几KB)
切换开销高(微秒级)中(微秒级)低(纳秒级)
内存占用独立内存空间共享进程内存共享线程内存
数据共享IPC机制全局变量全局变量
并发数量数十个数百个数十万个
CPU利用多核多核(受GIL限制)单核
适用场景CPU密集型I/O+少量CPUI/O密集型
编程复杂度中(需同步)
稳定性高(隔离性好)中(一个线程崩溃影响整个进程)中(一个协程出错可能影响其他协程)

7.2 性能对比测试

下面我们通过一个完整的测试来对比进程、线程和协程的性能差异:

import time
import multiprocessing
import threading
from gevent import monkey
import geventmonkey.patch_all()# 测试任务:模拟I/O密集型操作
def task(n):gevent.sleep(1)  # 模拟I/O操作return n * n# 进程方式
def process_way():start = time.time()pool = multiprocessing.Pool(processes=10)results = pool.map(task, range(10))pool.close()pool.join()print(f"进程方式 结果: {results} 耗时: {time.time() - start:.2f}秒")# 线程方式
def thread_way():start = time.time()threads = []results = []def worker(n):results.append(task(n))for i in range(10):t = threading.Thread(target=worker, args=(i,))t.start()threads.append(t)for t in threads:t.join()print(f"线程方式 结果: {results} 耗时: {time.time() - start:.2f}秒")# 协程方式
def coroutine_way():start = time.time()jobs = [gevent.spawn(task, i) for i in range(10)]gevent.joinall(jobs)results = [job.value for job in jobs]print(f"协程方式 结果: {results} 耗时: {time.time() - start:.2f}秒")# 执行测试
print("=== 进程方式测试 ===")
process_way()print("\n=== 线程方式测试 ===")
thread_way()print("\n=== 协程方式测试 ===")
coroutine_way()

预期结果

  • 进程方式:耗时≈1秒,但创建进程开销大

  • 线程方式:耗时≈1秒,受GIL影响

  • 协程方式:耗时≈1秒,但资源占用最少

8. 协程最佳实践

8.1 避免CPU密集型任务

协程不适合纯CPU计算任务,下面是一个反面例子:

import time
from gevent import monkey
import geventmonkey.patch_all()# CPU密集型任务
def cpu_intensive(n):result = 0for i in range(n):result += i * ireturn result# 测试协程执行CPU密集型任务
def test_coroutine_cpu():start = time.time()jobs = [gevent.spawn(cpu_intensive, 1000000) for _ in range(10)]gevent.joinall(jobs)print(f"协程执行CPU密集型任务耗时: {time.time() - start:.2f}秒")# 测试顺序执行
def test_sequential_cpu():start = time.time()results = [cpu_intensive(1000000) for _ in range(10)]print(f"顺序执行CPU密集型任务耗时: {time.time() - start:.2f}秒")print("=== CPU密集型任务测试 ===")
test_coroutine_cpu()
test_sequential_cpu()

结论:对于CPU密集型任务,协程不会带来性能提升,反而可能因为切换开销而降低性能。

8.2 合理控制并发量

虽然协程很轻量,但也需要合理控制并发量:

from gevent import monkey
import gevent
import requests
import timemonkey.patch_all()def fetch(url, semaphore):with semaphore:  # 控制并发量print(f"开始请求 {url}")response = requests.get(url)print(f"完成请求 {url} 状态码: {response.status_code}")def controlled_concurrency(url, concurrency=10, total=100):start = time.time()semaphore = gevent.pool.Semaphore(concurrency)  # 并发信号量jobs = [gevent.spawn(fetch, url, semaphore) for _ in range(total)]gevent.joinall(jobs)print(f"总请求数: {total} 并发数: {concurrency} 总耗时: {time.time() - start:.2f}秒")# 测试不同并发控制
url = "https://www.baidu.com"
controlled_concurrency(url, concurrency=10, total=100)
controlled_concurrency(url, concurrency=20, total=100)
controlled_concurrency(url, concurrency=50, total=100)

8.3 异常处理

正确处理协程中的异常:

import gevent
from gevent import monkey
monkey.patch_all()def successful_task():return "成功完成任务"def failing_task():raise Exception("任务执行失败")def handle_exceptions():jobs = [gevent.spawn(successful_task),gevent.spawn(failing_task),gevent.spawn(successful_task)]gevent.joinall(jobs)for job in jobs:if job.successful():print(f"任务结果: {job.value}")else:print(f"任务失败: {job.exception}")print("=== 异常处理测试 ===")
handle_exceptions()

9. 常见问题解答

Q: 协程能利用多核CPU吗?
A: 单个协程不能,因为它在单线程中运行。但可以通过多进程+协程的方式利用多核,例如每个进程运行一个事件循环。

Q: 协程会取代线程吗?
A: 不会完全取代。协程适合I/O密集型场景,线程适合需要利用多核或与C扩展交互的场景。最佳实践是根据需求选择合适的并发模型。

Q: Gevent的monkey patch安全吗?
A: 在大多数情况下是安全的,但应注意:

  1. 尽早调用monkey.patch_all()

  2. 避免与其他修改标准库的库同时使用

  3. 生产环境中先充分测试

Q: 如何调试协程程序?
A: 调试协程程序可以使用:

  1. 打印日志

  2. gevent.getcurrent()获取当前协程

  3. 使用支持协程的调试器如PyCharm专业版

10. 总结

协程是Python中强大的并发编程工具,特别适合I/O密集型应用。通过本文的完整介绍和代码示例,我们了解了:

  1. 协程的基本概念和原理

  2. Python中实现协程的三种方式:yield、greenlet、gevent

  3. 完整的效率对比测试代码和结果分析

  4. 协程在实际项目中的应用场景

  5. 协程与线程、进程的详细对比

  6. 协程编程的最佳实践和常见问题

掌握协程技术可以让你的Python程序在处理高并发时更加高效和优雅。希望本文能帮助你全面理解Python协程,并在实际项目中灵活应用。

相关文章:

  • 技巧-多数元素
  • 软件开发过程通常包含多个阶段,结合 AI 应用,可规划出以下 Markdown 文件名称的资料来记录各阶段内容
  • 深度强化学习的AI智能体实战:从训练到部署全流程解析
  • 码上云端·实战征文|无需邀请码,OpenManus深度测评
  • Python中的 for 与 迭代器
  • 第14章:MCP服务端项目开发实战:多模态信息处理
  • 每日算法-250424
  • 黑客密码:解锁互联网提问的智慧密码
  • 解决NSMutableData appendData性能开销太大的问题
  • Linux命令行基础入门详解
  • 09前端项目----分页功能
  • 通过监督微调(SFT)提升AI Agent效果的完整指南
  • 2025年3月电子学会青少年机器人技术(五级)等级考试试卷-实际操作
  • 小刚说C语言刷题——1317正多边形每个内角的度数?
  • 项目班——0419——chrono时间库
  • Redis 与 Memcache 全面对比:功能、性能与应用场景解析
  • mysql——索引事务和JDBC编程
  • 项目——高并发内存池
  • RHCE练习1
  • C语言——函数
  • “80后”王建浩履新三沙市委常委、组织部部长、秘书长
  • 最高法:“盗链”属于信息网络传播行为,构成侵犯著作权罪
  • 凯撒旅业:2024年营业收入约6.53亿元,同比增长12.25%
  • 上海4-6月文博美展、剧目演出不断,将开设直播推出文旅优惠套餐
  • 厚植民营企业家成长土壤是民营经济高质量发展的关键
  • 百年前的亚裔艺术家与巴黎