事件循环
'''
协程编码三要素:
1. 事件循环
2. 回调函数 :驱动生成器运行
3. I/O多路复用 :SELECT、EPOLL
asyncio是python用于解决异步I/O编程的一整套方案
'''
import asyncio
import time
from functools import partial
async def testAsync(*args):
print("this is a test for async")
# 同步阻塞的 time.sleep() 不能用于异步阻塞
# 可以通过循环执行10次来比较 time.sleep() 和 await asyncio.sleep(2)的差别
# time.sleep(2)
await asyncio.sleep(2)
print("test again")
return "lixingyun"
# 必须指定 future 参数,这个 future 就是调用 callback 的那个函数
# 如果回调函数有参数,那么参数必须放在前面,且必须通过 partial 函数封装后再调用
def callback(args, future):
print("callback ==>", future.result(), "another ==>", args)
if __name__ == "__main__":
# 开启事件循环,代替之前写过的 while...selector...
start = time.time()
loop = asyncio.get_event_loop()
# 执行协程代码
loop.run_until_complete(testAsync())
print("耗时:", time.time() - start) # 耗时: 2.001452922821045
# 如果使用 await asyncio.sleep(2) ,即使执行10次,耗时仍然是2秒
# 但如果使用 time.sleep(2),则会耗时20秒
start = time.time()
tasks = [testAsync() for i in range(2)]
loop.run_until_complete(asyncio.wait(tasks))
print("耗时:", time.time() - start) # 耗时: 2.001422166824341
# wait 和 gather 的区别在于
# gather 抽象级别更高,除了可以实现 wait 所有的功能,还可以实现分组
start = time.time()
group1 = [testAsync() for i in range(2)]
group2 = [testAsync() for i in range(2)]
# 两组一起执行
loop.run_until_complete(asyncio.gather(*group1, *group2))
print("耗时:", time.time() - start) # 耗时: 2.000610113143921
# 或者这样执行
start = time.time()
group3 = [testAsync() for i in range(2)]
group4 = [testAsync() for i in range(2)]
group3 = asyncio.gather(*group3)
group4 = asyncio.gather(*group4)
loop.run_until_complete(asyncio.gather(group3, group4))
print("耗时:", time.time() - start) # 耗时: 2.000826835632324
# 获取协程的返回值
# test_future = asyncio.ensure_future(testAsync()) 这条语句和下面的等效,用法也是一模一样
test_future = loop.create_task(testAsync())
# 当运行完成时执行指定的回调函数,不用显式地将 test_future 指定为参数,python会自动传递,类似于传递 self 一样
# 通过偏函数 partial 封装 callback 函数,将参数传递给 callback
# 所谓偏函数,是将所要承载的函数作为 partial 的第一个参数,原函数的各个参数依次作为 partial 函数后续的参数,除非使用关键字参数
test_future.add_done_callback(partial(callback, "wanglin")) # callback ==> lixingyun another ==> wanglin
loop.run_until_complete(test_future)
# 和线程的调用方法一致
print(test_future.result()) # lixingyun
# 取消协程的执行
start = time.time()
task1 = testAsync()
task2 = testAsync()
task3 = testAsync()
tasks = [task1, task2, task3]
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
all_tasks = asyncio.Task.all_tasks()
for task in all_tasks:
result = task.cancel()
print("cancel task ==>", result)
loop.stop()
# stop 之后要调用 run_forever 方法,否则抛异常
loop.run_forever()
finally:
loop.close()
print("耗时:", time.time() - start) # 耗时: 2.0014379024505615
大约 6 分钟