Python协程

文章目录

  • 解析协程运行时
  • 异步接口同步实现
  • 使用Task实现异步
  • 生产者消费者模型的协程版本
  • 参考

看到吐血 (´ཀ

1`
」 ∠)

  • 协程(Coroutine)本质上是一个函数,特点是在代码块中可以将执行权交给其他协程

  • 众所周知,子程序(函数)都是层级调用的,如果在A中调用了B,那么B执行完毕返回后A才能执行完毕。协程与子程序有点类似,但是它在执行过程中可以中断,转而执行其他的协程,在适当的时候再回来继续执行。

  • 协程与多线程相比的最大优势在于:协程是一个线程中执行,没有线程切换的开销;协程由用户决定在哪里交出控制权

  • 这里用到的是asyncio库(Python 3.7),这个库包含了大部分实现协程的魔法工具

  • 使用 async 修饰词声明异步函数

    • 使用 await 语句执行可等待对象(Coroutine、Task、Future)
    • 使用 asyncio.create_task 创建任务,将异步函数(协程)作为参数传入,等待event loop执行
    • 使用 asyncio.run 函数运行协程程序,协程函数作为参数传入

解析协程运行时

1import asyncio 2import time 3 4async def a(): 5 print("欢迎使用 a !") 6 await asyncio.sleep(1) 7 print("欢迎回到 a !") 8 9async def b(): 10 print("欢迎来到 b !") 11 await asyncio.sleep(2) 12 print("欢迎回到 b !") 13 14async def main(): 15 task1 = asyncio.create_task(a()) 16 task2 = asyncio.create_task(b()) 17 print("准备开始") 18 await task1 19 print("task1 结束") 20 await task2 21 print("task2 结束") 22 23if __name__ == "__main__": 24 start = time.perf_counter() 25 26 asyncio.run(main()) 27 28 print('花费 {} s'.format(time.perf_counter() - start)) 29 30
  • 解释:

  • 1、asyncio.run(main()),程序进入main()函数,开启事件循环

    • 2、创建任务task1、task2并进入事件循环等待运行
    • 3、输出准备开始
    • 4、执行await task1,用户选择从当前主任务中切出,事件调度器开始调度 a
    • 5、a 开始运行,输出欢迎使用a!,运行到await asyncio.sleep(1),从当前任务切出,事件调度器开始调度 b
    • 6、b 开始运行,输出欢迎来到b!,运行到await asyncio.sleep(2),从当前任务切出
    • 7、以上事件运行时间非常短(毫秒),事件调度器开始暂停调度
    • 8、一秒钟后,a的sleep完成,事件调度器将控制权重新交给a,输出欢迎回到a!,task1完成任务,退出事件循环
    • 9、await task1完成,事件调度器将控制权还给主任务,输出task1结束,然后在await task2处继续等待
    • 10、两秒钟后,b的sleep完成,事件调度器将控制权重新传给 b,输出欢迎回到 b!,task2完成任务,从事件循环中退出
    • 11、事件调度器将控制权交还给主任务,主任务输出task2结束,至此协程任务全部结束,事件循环结束。

上面的代码也可以这样写,将15到21行换成一行await asyncio.gather(a(), b())也能实现类似的效果,await asyncio.gather 会并发运行传入的可等待对象(Coroutine、Task、Future)。

1import asyncio 2import time 3 4async def a(): 5 print("欢迎使用 a !") 6 await asyncio.sleep(1) 7 print("欢迎回到 a !") 8 9async def b(): 10 print("欢迎来到 b !") 11 await asyncio.sleep(2) 12 print("欢迎回到 b !") 13 14async def main(): 15 await asyncio.gather(a(), b()) 16 17if __name__ == "__main__": 18 start = time.perf_counter() 19 20 asyncio.run(main()) 21 22 print('花费 {} s'.format(time.perf_counter() - start)) 23 24

异步接口同步实现

1""" 2- 简单爬虫模拟 3- 这里用异步接口写了个同步代码 4""" 5 6import asyncio 7import time 8 9async def crawl_page(url): 10 print('crawling {}'.format(url)) 11 sleep_time = int(url.split('_')[-1]) 12 await asyncio.sleep(sleep_time) # 休眠 13 print('OK {}'.format(url)) 14 15async def main(urls): 16 for url in urls: 17 await crawl_page(url) # await会将程序阻塞在这里,进入被调用的协程函数,执行完毕后再继续 18 19 20start = time.perf_counter() 21 22# pip install nest-asyncio 23asyncio.run(main(['url_1', 'url_2'])) # 协程接口 24 25print("Cost {} s".format(time.perf_counter() - start)) 26 27

使用Task实现异步

1# 异步实现 2 3import asyncio 4import time 5 6async def crawl_page(url): 7 print('crawling {}'.format(url)) 8 sleep_time = int(url.split('_')[-1]) 9 await asyncio.sleep(sleep_time) 10 print('OK {}'.format(url)) 11 12async def main(urls): 13 tasks = [asyncio.create_task(crawl_page(url)) for url in urls] 14 for task in tasks: 15 await task 16 # 14、15行也可以换成这一行await asyncio.gather(*tasks) 17 # *tasks 解包列表,将列表变成了函数的参数,与之对应的是,** dict 将字典变成了函数的参数 18 19start = time.perf_counter() 20 21asyncio.run(main(['url_1', 'url_2'])) 22 23print("Cost {} s".format(time.perf_counter() - start)) 24 25

生产者消费者模型的协程版本

1# 极客时间:Python核心技术与实战 2 3import asyncio 4import random 5import time 6 7async def consumer(queue, id): 8 """消费者""" 9 while True: 10 val = await queue.get() 11 print('{} get a val : {}'.format(id, val)) 12 await asyncio.sleep(1) 13 14 15async def producer(queue, id): 16 """生产者""" 17 for _ in range(5): 18 val = random.randint(1, 10) 19 await queue.put(val) 20 print('{} put a val: {}'.format(id, val)) 21 await asyncio.sleep(1) 22 23async def main(): 24 queue = asyncio.Queue() 25 26 consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1')) 27 consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2')) 28 29 producer_1 = asyncio.create_task(producer(queue, 'producer_1')) 30 producer_2 = asyncio.create_task(producer(queue, 'producer_2')) 31 32 await asyncio.sleep(10) 33 # cancel掉执行之间过长的consumer_1、consumer_2,while True 34 consumer_1.cancel() 35 consumer_2.cancel() 36 37 # return_exceptions 设为True,不让异常throw到执行层,影响后续任务的执行 38 await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True) 39 40if __name__ == "__main__": 41 start = time.perf_counter() 42 43 asyncio.run(main()) 44 45 print("Cost {} s".format(time.perf_counter() - start)) 46 47

在这里插入图片描述

1# 输出结果 2producer_1 put a val: 7 3producer_2 put a val: 4 4consumer_1 get a val : 7 5consumer_2 get a val : 4 6producer_1 put a val: 6 7producer_2 put a val: 1 8consumer_2 get a val : 6 9consumer_1 get a val : 1 10producer_1 put a val: 8 11producer_2 put a val: 1 12consumer_1 get a val : 8 13consumer_2 get a val : 1 14producer_1 put a val: 6 15producer_2 put a val: 4 16consumer_2 get a val : 6 17consumer_1 get a val : 4 18producer_1 put a val: 7 19producer_2 put a val: 8 20consumer_1 get a val : 7 21consumer_2 get a val : 8 22Cost 10.0093015 s 23 24

拓展阅读:Python的生产者消费者模型,看这篇就够了

参考

代码交流 2021