异步编程在在编程中越来越常见,特别是在python3支持异步编程之后,越来越多的框架也开始支持异步编程,例如tornado、fastapi、django 3.x asgi等。正确使用异步编程可以提升计算性能。
我们从协程的概念、asyncio异步编程模块的实战案例三个方面介绍python的异步编程。
1 协程 协程不是计算机提供,程序员人为创造出来的。
协程被称为微线程,是一种用户态内在上下文切换的技术。简而言之,其实就是通过一个线程实现代码块互相切换执行。例如:
1 2 3 4 5 6 7 8 9 10 def func1 (): print (1 ) ... print (2 ) def func2 (): print (3 ) ... print (4 ) func1() func2()
协程就是让人为控制在函数之间来回切换(一个线程)
实现协程的几种方法:
1.1 greelet实现协程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from greenlet import greenletdef func1 (): print (1 ) gr2.switch() print (2 ) gr2.switch() def func2 (): print (3 ) gr1.switch() print (4 ) gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch()
1.2 yield关键字 1 2 3 4 5 6 7 8 9 10 11 def func1 (): yield 1 yield from func2() yield 2 def func2 (): yield 3 yield 4 f1 = func1() for item in f1: print (item)
1.3 asyncio 在python3.4以及之后的版本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncio@asyncio.coroutine def func1 (): print (1 ) yield from asyncio.sleep(2 ) print (2 ) @asyncio.coroutine def func2 (): print (3 ) yield from asyncio.sleep(2 ) print (4 ) tasks = [ asyncio.ensure_future( func1() ), asyncio.ensure_future( func2() ) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
注意:遇到IO阻塞会自动切换,上面的两种方法是手动切换的。
1.4 async & await关键字 在python3.5以及之后的版本。唯一的不同在于用async关键字替换装饰器,用await替换yield。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncioasync def func1 (): print (1 ) await asyncio.sleep(2 ) print (2 ) async def func2 (): print (3 ) await asyncio.sleep(2 ) print (4 ) tasks = [ asyncio.ensure_future( func1() ), asyncio.ensure_future( func2() ) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
2 协程的意义 在一个线程中如果遇到IO等待的时间,线程不会等待,利用空闲的时间做一些其他的事情。
案例:去下载三张图片(网络IO)
普通的方式(同步的方式)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 """ 下载图片使用第三方模块requests,请提前安装:pip3 install requests """ import requestsdef download_image (url ): print ("开始下载:" ,url) response = requests.get(url) print ("下载完成" ) file_name = url.rsplit('_' )[-1 ] with open (file_name, mode='wb' ) as file_object: file_object.write(response.content) if __name__ == '__main__' : url_list = [ 'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg' , 'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg' , 'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg' ] for item in url_list: download_image(item)
协程的方式(异步的方式)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 """ 下载图片使用第三方模块aiohttp,请提前安装:pip3 install aiohttp """ import aiohttpimport asyncioasync def fetch (session, url ): print ("发送请求:" , url) async with session.get(url, verify_ssl=False ) as response: content = await response.content.read() file_name = url.rsplit('_' )[-1 ] with open (file_name, mode='wb' ) as file_object: file_object.write(content) print ("下载完成" ,url) async def main (): async with aiohttp.ClientSession() as session: url_list = [ 'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg' , 'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg' , 'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg' ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] await asyncio.wait(tasks) if __name__ == '__main__' : asyncio.run(main())
3 异步编程 3.1 事件循环 理解为一个死循环,去检测并执行某些任务,在特定条件下终止循环。
1 2 3 4 5 6 7 8 9 任务列表 = [ 任务1 , 任务2 , 任务3 ,... ] while True : 可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行' 和'已完成' 的任务返回 for 就绪任务 in 已准备就绪的任务列表: 执行已就绪的任务 for 已完成的任务 in 已完成的任务列表: 在任务列表中移除 已完成的任务 如果 任务列表 中的任务都已完成,则终止循环
1 2 3 4 5 6 import asyncioloop = asyncio.get_event_loop() loop.run_until_complete(任务)
3.2 快速上手 协程函数,定义函数的时候async def
函数名。
协程对象,执行 协程函数() 得到的协程对象。
1 2 3 4 5 async def func (): pass result = funct()
注意:执行协程函数创建协程对象,函数内部代码不会执行。
如果想要运行协程函数内部代码,必须将协程对象交给事件循环来处理。
1 2 3 4 5 6 7 8 9 10 11 async def func (): print ("hhh" ) result = func() asyncio.run(result) await result()
3.3 await await + 可等待的对象(协程对象,Future、Task对象->IO等待)
示例1:
1 2 3 4 5 6 7 8 import asyncioasync def func (): print ("开始" ) response = await asyncio.sleep(2 ) print ("结束" ,response) asyncio.run( func() )
示例2:
1 2 3 4 5 6 7 8 9 10 11 12 import asyncioasync def others (): print ("start" ) await asyncio.sleep(2 ) print ('end' ) return '返回值' async def func (): print ("执行协程函数内部代码" ) response = await others() print ("IO请求结束,结果为:" , response) asyncio.run( func() )
示例3:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncioasync def others (): print ("start" ) await asyncio.sleep(2 ) print ('end' ) return '返回值' async def func (): print ("执行协程函数内部代码" ) response1 = await others() print ("IO请求结束,结果为:" , response1) response2 = await others() print ("IO请求结束,结果为:" , response2) asyncio.run( func() )
await就是等待对应的值得到结果之后再继续向下走。
3.4 Task对象
Tasks are used to schedule coroutines concurrently .
When a coroutine is wrapped into a Task with functions like asyncio.create_task()
the coroutine is automatically scheduled to run soon。
白话:在时间循环中添加多个任务。
Tasks用于并发调度协程,通过asyncio.create_task(协程对象)
的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task()
函数以外,还可以用低层级的 loop.create_task()
或 ensure_future()
函数。不建议手动实例化 Task 对象。
注意:asyncio.create_task()
函数在python 3.7中被加入,在此之前,可以改用底层级的asyncio.ensure_future()
函数。
示例1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncioasync def func (): print (1 ) await asyncio.sleep(2 ) print (2 ) return "返回值" async def main (): print ("main开始" ) task1 = asyncio.create_task(func()) task2 = asyncio.create_task(func()) print ("main结束" ) ret1 = await task1 ret2 = await task2 print (ret1, ret2) asyncio.run(main())
示例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncioasync def func (): print (1 ) await asyncio.sleep(2 ) print (2 ) return "返回值" async def main (): print ("main开始" ) task_list = [ asyncio.create_task(func(), name="n1" ), asyncio.create_task(func(), name="n2" ) ] print ("main结束" ) done, pending = await asyncio.wait(task_list, timeout=None ) print (done, pending) asyncio.run(main())
示例3(先创建列表):
1 2 3 4 5 6 7 8 9 10 11 12 13 import asyncioasync def func (): print ("执行协程函数内部代码" ) response = await asyncio.sleep(2 ) print ("IO请求结束,结果为:" , response) coroutine_list = [func(), func()] done,pending = asyncio.run( asyncio.wait(coroutine_list) )
3.5 asyncio.Future对象(一般不使用,用于理解await)
A Future
is a special low-level awaitable object that represents an eventual result of an asynchronous operation.
Task继承Future对象,Task对象内部await结果的处理基于Future对象。
示例1:
1 2 3 4 5 6 7 8 async def main (): loop = asyncio.get_running_loop() fut = loop.create_future() await fut asyncio.run(main())
示例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioasync def set_after (fut ): await asyncio.sleep(2 ) fut.set_result("666" ) async def main (): loop = asyncio.get_running_loop() fut = loop.create_future() await loop.create_task(set_after(fut)) data = await fut print (data) asyncio.run(main())
3.6 concurrent.futures.Future对象 使用线程池、进程池实现异步操作时用到的对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import timefrom concurrent.futures import Futurefrom concurrent.futures.thread import ThreadPoolExecutorfrom concurrent.futures.process import ProcessPoolExecutordef func (value ): time.sleep(1 ) print (value) return 123 pool = ThreadPoolExecutor(max_workers=5 ) for i in range (10 ): fut = pool.submit(func, i) print (fut)
以后写代码中,可能会存在交叉使用。例如:crm项目80%都是基于协程异步编程 + mySQL(不支持)【线程、进程做异步编程】。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import timeimport asyncioimport concurrent.futuresdef func1 (): time.sleep(2 ) return "SB" async def main (): loop = asyncio.get_running_loop() fut = loop.run_in_executor(None , func1) result = await fut print ('default thread pool' , result) asyncio.run( main() )
案例:asyncio+不支持异步的模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncioimport requestsasync def download_image (url ): print ("开始下载:" , url) loop = asyncio.get_event_loop() future = loop.run_in_executor(None , requests.get, url) response = await future print ('下载完成' ) file_name = url.rsplit('_' )[-1 ] with open (file_name, mode='wb' ) as file_object: file_object.write(response.content) if __name__ == '__main__' : url_list = [ 'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg' , 'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg' , 'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg' ] tasks = [download_image(url) for url in url_list] loop = asyncio.get_event_loop() loop.run_until_complete( asyncio.wait(tasks) )
3.7 异步迭代器 什么是迭代器?
实现了__iter__
和__next__
方法的对象。
什么是可迭代对象?
在类里面了__iter__
方法,并返回一个迭代器。
什么是异步迭代器
实现了 __aiter__()
和 __anext__()
方法的对象。__anext__
必须返回一个 awaitable 对象。async for
会处理异步迭代器的 __anext__()
方法所返回的可等待对象,直到其引发一个 StopAsyncIteration
异常。由 PEP 492 引入。
什么是异步可迭代对象?
可在 async for
语句中被使用的对象。必须通过它的 __aiter__()
方法返回一个 asynchronous iterator 。由 PEP 492 引入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncioclass Reader (object ): """ 自定义异步迭代器(同时也是异步可迭代对象) """ def __init__ (self ): self.count = 0 async def readline (self ): self.count += 1 if self.count == 100 : return None return self.count def __aiter__ (self ): return self async def __anext__ (self ): val = await self.readline() if val == None : raise StopAsyncIteration return val async def func (): async_iter = Reader() async for item in async_iter: print (item) asyncio.run(func())
3.8 异步上下文管理器(打开-处理-关闭) 和with很像。
此种对象通过定义 __aenter__()
和 __aexit__()
方法来对 async with
语句中的环境进行控制。由 PEP 492 引入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncioclass AsyncContextManager : def __init__ (self ): self.conn = None async def do_something (self ): return 666 async def __aenter__ (self ): self.conn = await asyncio.sleep(1 ) return self async def __aexit__ (self, exc_type, exc, tb ): await asyncio.sleep(1 ) async def func (): async with AsyncContextManager() as f: result = await f.do_something() print (result) asyncio.run(func())
4 uvloop 是asyncio事件循环的替代方案。事件循环 > 默认asyncio的事件循环。
1 2 3 4 5 6 import asyncioimport uvloopasyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) asyncio.run(...)
注意:一个asgi-> ucicore
内部就是用的uvloop
5 实战案例 5.1 异步redis 在通过python代码操作redis时,链接/操作/断开都是网络IO。
示例1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import asyncioimport aioredisasync def execute (address, password ): print ("开始执行" , address) redis = await aioredis.create_redis(address, password=password) await redis.hmset_dict('car' , key1=1 , key2=2 , key3=3 ) result = await redis.hgetall('car' , encoding='utf-8' ) print (result) redis.close() await redis.wait_closed() print ("结束" , address) asyncio.run(execute('redis://47.93.4.198:6379' , "root!2345" ))