协程coroutine
大约 5 分钟
I/O模型
# 使用 SELECT 实现 HTTP 请求
import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
selector = DefaultSelector()
urls = ["https://www.baidu.com"]
stop = False
class HttpClient:
# 回调函数:处理发送数据
def handle_send(self, key):
selector.unregister(key.fd)
self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
# 继续监听是否可读,然后回调指定的方法
selector.register(self.client.fileno(), EVENT_READ, self.handle_read)
# 回调函数:接收返回数据
def handle_read(self, key):
recv = self.client.recv(1024)
if recv:
self.data += recv
else:
selector.unregister(key.fd)
data = self.data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
self.client.close()
# 解决 Windows 下报错问题
urls.remove(self.orig_url)
if not urls:
global stop
stop = True
def get_html(self, url):
self.orig_url = url
url = urlparse(url)
self.host = url.netloc
self.path = url.path
self.data = b""
if self.path == "":
self.path = "/"
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 使用非阻塞IO
self.client.setblocking(False)
try:
self.client.connect((self.host, 80))
except BlockingIOError as e:
pass
# 注册到 Selector,然后回调指定的方法
selector.register(self.client.fileno(), EVENT_WRITE, self.handle_send)
'''
SELECT + 回调 + 事件循环:不停地请求 Socket 的状态并调用对应的回调函数
这种`SELECT + 回调 + 事件循环`的方式性能比单纯使用 Socket 要高
因为它将一个完整的步骤拆分成了若干个阶段,每个阶段都是独立且不阻塞的,相互之间也没有耦合
但回调过多会造成代码难以维护,难于追踪,共享变量困难,以及难于处理异常的问题
'''
# 事件循环
def loop():
# # Linux 下适用
# while True:
# events = selector.select()
# for key, mask in events:
# callback = key.data
# callback(key)
# Windows/Linux 下适用
while not stop:
events = selector.select()
for key, mask in events:
callback = key.data
callback(key)
if __name__ == "__main__":
httpClient = HttpClient()
httpClient.get_html("https://www.baidu.com")
loop()
yield
'''
协程是为了同时解决下面三个问题而出现的:
1. 回调模式编码复杂度高
2. 同步编程的并发性低
3. 多线程编程的Lock性能较低且易造成死锁
协程(coroutine):
1. 采用同步的方式编写异步代码
2. 通过单线程调度和切换任务,也就是可以暂停在某个地方的执行,然后在适当的时候恢复执行
3. 不再需要锁
生成器不仅可以生成一个值,也接收一个值,或者同时接收与生成值
1. 返回值 "http://www.baiodu.com" 给调用方
2. 调用方将值设置给 html
'''
# 生成器的 send 方法
def generate_func():
html = yield "http://www.baiodu.com"
print(html)
yield 2
yield 3
return "lixingyun"
# 生成器的 close 方法
def close_func():
yield "http://www.aliyun.com"
yield 2
yield 3
return "lixingyun"
# 生成器的 throw 方法
def throw_func():
try:
yield "http://www.weixin.com"
except Exception as e:
pass
yield 2
yield 3
return "lixingyun"
gen = generate_func()
# 使用生成器的方式:next 和 send
# 在调用 send 发送非 None 值之前,必须启动一次生成器:要么调用 next() 方法,要么调用 send(None) 方法
url = next(gen)
# 传递值进入生成器内部,并让生成器执行到一下歌个yield的位置
gen.send("lixingyun") # lixingyun
print(url) # http://www.baiodu.com
print(next(gen)) # 3
print()
clf = close_func()
print(next(clf)) # http://www.aliyun.com
clf.close()
# print(next(clf))
print()
thr = throw_func()
print(next(thr)) # http://www.weixin.com
thr.throw(Exception, "connect1 error")
print(next(thr)) # 3
# thr.throw(Exception, "connect2 error")
print()
# # yield from 语法
# from itertools import chain
# my_list = [1, 2, 3]
# my_dict = {"a": 1, "b": 2}
# my_set = {4, 5, 6}
# my_chain1 = chain(my_list, my_dict, my_set, range(7, 10))
# for item in my_chain1:
# print(item)
# print()
#
# # 使用 yield from 实现自定义 chain
# def custom_chain(*args, **kwargs):
# for item in args:
# # yield from <可迭代对象> 等同于 下面的 for i in item: 循环
# yield from item
# # for i in item:
# # yield i
#
# my_chain2 = custom_chain(my_list, my_dict, my_set, range(7, 10))
# for item in my_chain2:
# print(item)
# print()
def func1(iterable):
# 直接返回 iterable
yield iterable
def func2(iterable):
# 会遍历并返回 iterable 中的每个元素
yield from iterable
for item in func1(range(5)):
print(item, end="\t") # range(0, 5)
print()
for item in func2(range(5)):
print(item, end="\t") # 0 1 2 3 4
print()
def func3(generator):
# 会遍历并返回 生成器 中的每个元素
yield from generator
'''
1. main 调用方
2. func 委托生成器,这个委托的生成器,就是协程
3. generator 子生成器
yield from 会在调用方 main 与 子生成器 generator 之间建立一个双向通道
'''
def main(generator):
func = func3(generate_func())
func.send(None)
# yield from 的综合例子
result = {}
# 先单独调用 sum_sales
def single_sum_sales(product):
total = 0
nums = []
while True:
x = yield
print(product + "销量: ", x)
if not x:
break
total += x
nums.append(x)
return total, nums
sums = single_sum_sales("黑神话·悟空")
sums.send(None)
sums.send(1) # 黑神话·悟空销量: 1
sums.send(3) # 黑神话·悟空销量: 3
sums.send(7) # 黑神话·悟空销量: 7
# sums.send(None) # StopIteration: (11, [1, 3, 7])
# 销量统计
def sum_sales(product):
total = 0
nums = []
while True:
x = yield
print(product + "销量: ", x)
if not x:
break
total += x
nums.append(x)
return total, nums
# 引入委托生成器(也就是协程)
def delegate_coroutine(args):
while True:
# 如果直接调用 sum_sales(args),当最后给它发送 send(None) 时会抛出异常:StopIteration: (11, [1, 3, 7])
# 但通过 yield from sum_sales(args),就把异常处理了,而且返回了需要的数据
result[args] = yield from sum_sales(args)
# 定义调用方
def main():
data_dict = {
"黑神话·悟空": [1, 3, 7],
"魔兽世界": [2, 4, 6, 10],
"王者荣耀": [1, 2, 3, 4],
}
for key, data_list in data_dict.items():
print("开始统计《" + key + "》的销量")
# 创建生成器(协程)
delegate = delegate_coroutine(key)
# 启动生成器(协程),也可以用 next(delegate)
# next(delegate)
delegate.send(None)
for data in data_list:
# 向生成器(协程)发送数据
delegate.send(data)
# 统计完成后发送 None 结束
delegate.send(None)
print("统计结果 ==>", result)
main() # 统计结果 ==> {'黑神话·悟空': (11, [1, 3, 7]), '魔兽世界': (22, [2, 4, 6, 10]), '王者荣耀': (10, [1, 2, 3, 4])}
# 生成器也是可以暂停的函数
import inspect
def func4():
yield 1
return "lixingyun"
generate_func = func4()
print(inspect.getgeneratorstate(generate_func)) # GEN_CREATED
print(next(generate_func)) # 1
print(inspect.getgeneratorstate(generate_func)) # GEN_SUSPENDED
try:
print(next(generate_func)) # 无输出
except StopIteration as e:
pass
print(inspect.getgeneratorstate(generate_func)) # GEN_CLOSED
async和await
# 除了可以用生成器来实现协程的功能,python引入了 async 和 await 两个关键字来定义原生的协程
# 可以将 await 看成 yield from 的简写
async def hello():
# 这里面不能用yield,否则会抛异常:AttributeError: 'async_generator' object has no attribute 'send'
# yield 1
print("I am hello")
# await 也只能出现在 async 函数中
r = await world2()
print("喂,%s" % r)
# 或者也可以使用 @type.coroutine 来定义协程
import types
@types.coroutine
def world2():
print("I am world")
yield "李星云"
async def world1():
print("I am world")
return "李星云"
if __name__ == '__main__':
corou = hello()
try:
# 不使用 @types.coroutine 时用这种方式输出
# corou.send(None)
# 使用 @types.coroutine 就只能用这种方式输出
print(corou.send(None))
except StopIteration as e:
pass
感谢支持
更多内容,请移步《超级个体》。