当前位置: 首页 > news >正文

免费的网站软件正能量推荐查企业免费版

免费的网站软件正能量推荐,查企业免费版,判断网站做的好坏,义乌外贸建网站https://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html ——A. Jesse Jiryu Davis and Guido van Rossum 介绍 网络程序消耗的不是计算资源#xff0c;而是打开许多缓慢的连接#xff0c;解决此问题的现代方法是异步IO。 本章介绍一个简单的网络爬虫而是打开许多缓慢的连接解决此问题的现代方法是异步IO。 本章介绍一个简单的网络爬虫使用异步I/O实现。本章分三个部分首先是异步事件循环以及一个带有回调的爬虫。其次我们说明了Python协程高效且可扩展。我们使用生成器实现简单的协程。最后我们使用Python标准库asyncio的协程并使用异步队列来协调。 任务 网络爬虫的功能是下载网站上的所有页面从根URL获取页面并解析页面中的链接并对链接中的页面进行相同的操作直到世界的尽头。我们可以通过并发进行加速同时下载许多页面。 传统方法 如何实现并发的爬虫传统上我们使用线程池。 def fetch(url):sock socket.socket()sock.connect((xkcd.com, 80))request GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n.format(url)sock.send(request.encode(ascii))response bchunk sock.recv(4096)while chunk:response chunkchunk sock.recv(4096)# Page is now downloaded.links parse_links(response)q.add(links)默认情况下socket操作处于阻塞状态线程调用connect或recv方法时它会暂停。因此要一次下载多个页面我们需要许多线程。为了方便线程复用减少创建和销毁线程的开销我们使用线程池。 但线程成本高操作系统可能对线程数量设置了上限。如果我们在并发socket上同时扩展到数以万计的操作我们会在用完socket之前用完线程。每个线程的开销或系统限制是瓶颈。 Async异步 异步 I/O 框架使用非阻塞socket在单个线程上执行并发操作。在我们的异步爬虫程序中我们在开始连接到服务器之前将套接字设置为非阻塞 sock socket.socket() sock.setblocking(False) # 非阻塞 try:sock.connect((xkcd.com, 80)) except BlockingIOError:pass恼火的是非阻塞socket会从connect中抛出异常即使它正常工作。此异常复制了底层的C函数的行为该函数将EINPROGRESS设置为errno通知开始。 现在我们的爬虫需要一种方法来知道连接何时建立以便它可以发送 HTTP 请求。我们可以在一个循环中反复尝试 request GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n.format(url) encoded request.encode(ascii) while True:try:sock.send(encoded)break # Done.except OSError as e:pass print(sent)这种方法不仅浪费电力而且无法有效地等待多个sockets上的事件。在古代BSD Unix 的解决方案是 select 一个 C 函数它等待事件发生在一个非阻塞套接字或它们的一个小数组上。如今对具有大量连接的互联网应用程序的需求导致了像poll 然后是BSD的 kqueue 和 Linux 上的epoll 等替代品的出现。这些 API 与 select 类似但在连接数量非常多的情况下表现良好。 Python 3.4的 DefaultSelector 使用系统上可用的最好的 类select函数。要注册有关网络 I/O 的通知我们创建一个非阻塞套接字并将其注册到默认选择器 from selectors import DefaultSelector, EVENT_WRITE selector DefaultSelector() # 选择器 sock socket.socket() sock.setblocking(False) # 非阻塞套接字 try:sock.connect((xkcd.com, 80)) except BlockingIOError:pass def connected():selector.unregister(sock.fileno())print(connected!) selector.register(sock.fileno(), EVENT_WRITE, connected)我们忽略虚假错误并调用 selector.register() 传入套接字的文件描述符和一个表示我们正在等待的事件的常量。为了在建立连接时收到通知我们传递 EVENT_WRITE 也就是说我们想知道套接字何时是“可写”的。我们还传递了函数 connected 以便在该事件发生时运行。这样的函数称为回调callback。 当选择器接收 I/O 通知时以遍历方式处理 def loop():while True:events selector.select()for event_key, event_mask in events:callback event_key.datacallback()event_key.data作为connected的回调函数 一旦连接了非阻塞套接字我们就会检索并执行。 与上面的快速旋转循环不同对 select()的调用会暂停等待下一个 I/O 事件。然后循环运行正在等待这些事件的回调。尚未完成的操作将保持挂起状态直到事件循环的未来某个滴答声。 我们展示了如何在操作准备就绪时开始操作并执行回调。异步框架基于我们展示的两个功能非阻塞套接字和事件循环构建用于在单个线程上运行并发操作。 我们在这里实现了“并发性”但不是传统上所说的“并行性”。也就是说我们构建了一个执行重叠 I/O 的微型系统。它能够在其他人在飞行过程中开始新的操作。它实际上并不利用多个内核来并行执行计算。但是这个系统是为 I/O 密集型问题而设计的而不是 CPU 密集型问题。 因此我们的事件循环在并发 I/O 时是有效的因为它不会将线程资源专用于每个连接。但在我们继续之前重要的是要纠正一个常见的误解即异步比多线程更快。通常情况并非如此事实上在 Python 中像我们这样的事件循环在处理少量非常活跃的连接时比多线程慢一些。在没有全局解释器锁的运行时中线程在这样的工作负载上会表现得更好。异步 I/O 适用于具有许多缓慢或昏昏欲睡的连接且事件不频繁的应用程序。 使用回调进行编程 使用我们目前构建的简陋的异步框架我们如何构建网络爬虫即使是一个简单的 URL 获取器也很难编写。 我们从尚未获取的 URL 以及看过的 URL 开始 urls_todo set([/]) seen_urls set([/])获取页面将需要一系列回调。连接套接字时触发 connected 回调并向服务器发送 GET 请求。但随后它必须等待响应因此它会注册另一个回调。如果在触发该回调时它还无法读取完整的响应则会再次注册依此类推。 让我们将这些回调收集到一个Fetcher对象中它需要一个 URL、一个套接字对象和一个累积响应字节的位置 class Fetcher:def __init__(self, url):self.response b # Empty array of bytes.self.url urlself.sock None首先调用Fetcher.fetch() # Method on Fetcher class.def fetch(self):self.sock socket.socket()self.sock.setblocking(False)try:self.sock.connect((xkcd.com, 80))except BlockingIOError:pass# Register next callback.selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)fetch() 方法开始连接套接字。但请注意该方法在完全建立连接之前返回。它必须将控制权返回到事件循环以等待连接。为了理解原因想象一下我们的整个应用程序是这样的结构 # Begin fetching http://xkcd.com/353/ fetcher Fetcher(/353/) fetcher.fetch() while True:events selector.select()for event_key, event_mask in events:callback event_key.datacallback(event_key, event_mask)当事件循环调用 select 时所有事件通知都会在事件循环中处理。因此 fetch() 必须将控制权交给事件循环以便程序知道套接字何时连接。只有这样循环才会运行connected 回调该回调已在上面的 fetch 末尾注册。 以下是 connected 的实现 # Method on Fetcher class.def connected(self, key, mask):print(connected!)selector.unregister(key.fd)request GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n.format(self.url)self.sock.send(request.encode(ascii))# Register the next callback.selector.register(key.fd,EVENT_READ,self.read_response)该方法发送 GET 请求。一个真正的应用程序会检查 send 的返回值 以防无法一次发送整个消息。但是我们的请求很小应用程序也不复杂。它轻快地调用 send 然后等待响应。当然它必须注册另一个回调并放弃对事件循环的控制权。下一个也是最后一个回调 read_response 处理服务器的回复 # Method on Fetcher class.def read_response(self, key, mask):global stoppedchunk self.sock.recv(4096) # 4k chunk size.if chunk:self.response chunkelse:selector.unregister(key.fd) # Done reading.links self.parse_links()# Python set-logic:for link in links.difference(seen_urls):urls_todo.add(link)Fetcher(link).fetch() # - New Fetcher.seen_urls.update(links)urls_todo.remove(self.url)if not urls_todo:stopped True每次选择器看到套接字是“可读的”时都会执行回调这可能意味着两件事套接字有数据或已关闭。回调要求从套接字提供最多 4 KB 的数据。如果准备就绪 chunk 包含任何可用的数据。如果有更多 chunk 则长度为 4 KB并且套接字保持可读性因此事件循环会在下一个时钟周期再次运行此回调。响应完成后服务器已关闭套接字 chunk为空。 parse_links 方法未显示返回一组 URL。我们为每个新 URL 启动一个新的提取器没有并发上限。请注意带有回调的异步编程的一个很好的功能我们不需要对共享数据的更改进行互斥锁例如当我们添加指向 seen_urls 的链接时。没有抢占式的多任务处理因此我们不能在代码中的任意点被打断。 我们添加一个全局变量 stopped 用来控制循环 stopped False def loop():while not stopped:events selector.select()for event_key, event_mask in events:callback event_key.datacallback()下载所有页面后提取器将停止循环程序将退出。 这个例子把异步的问题说得很清楚意大利面条代码。我们需要某种方式来表达一系列计算和 I/O 操作并安排多个这样的操作并发运行。但是如果没有线程就无法将一系列操作收集到单个函数中每当函数开始 I/O 操作时它都会显式保存将来需要的任何状态然后返回。您负责思考和编写此保存状态的代码。 让我们解释一下。考虑一下我们使用传统阻塞套接字在线程上获取 URL 是多么简单 # Blocking version. def fetch(url):sock socket.socket()sock.connect((xkcd.com, 80))request GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n.format(url)sock.send(request.encode(ascii))response bchunk sock.recv(4096)while chunk:response chunkchunk sock.recv(4096)# Page is now downloaded.links parse_links(response)q.add(links)此函数在一个套接字操作和下一个套接字操作之间记住什么状态它有套接字、URL 和累积的 response 。在线程上运行的函数使用编程语言的基本功能将此临时状态存储在其堆栈上的局部变量中。该函数还具有“延续”即它计划在 I/O 完成后执行的代码。运行时通过存储线程的指令指针来记住延续。您无需考虑恢复这些局部变量和 I/O 之后的延续。 但是对于基于回调的异步框架在等待 I/O 时函数必须显式保存其状态因为函数会在 I/O 完成之前返回并丢失其堆栈帧。代替局部变量我们基于回调的示例将 sock 和 response 存储 为 Fetcher 实例的 self 属性。代替指令指针它通过注册回调 connected 和 read_response 来存储其延续。随着应用程序功能的增长我们在回调中手动保存的状态的复杂性也在增加。如此繁重的会计工作使编码员容易患偏头痛。 更糟糕的是如果回调在安排链中的下一个回调之前抛出异常会发生什么假设 parse_links() 做得很差并且在解析一些 HTML 时会抛出异常 Traceback (most recent call last):File loop-with-callbacks.py, line 111, in moduleloop()File loop-with-callbacks.py, line 106, in loopcallback(event_key, event_mask)File loop-with-callbacks.py, line 51, in read_responselinks self.parse_links()File loop-with-callbacks.py, line 67, in parse_linksraise Exception(parse error) Exception: parse error堆栈跟踪仅显示事件循环正在运行回调。我们不记得是什么导致了错误。链条的两端都断了我们忘记了我们要去哪里我们从哪里来。这种上下文的丢失被称为“堆栈撕裂”在许多情况下它使调查人员感到困惑。堆栈撕裂还阻止我们为回调链安装异常处理程序就像“try / except”块包装函数调用及其后代树的方式一样。 因此即使除了关于多线程和异步的相对效率的长期争论之外还有另一个关于哪个更容易出错的争论如果线程在同步时出错它们很容易受到数据争用的影响但由于堆栈撕裂回调很难调试。 协程 Coroutines 我们用画饼来吸引你编写异步代码将回调的效率与多线程编程的经典美观相结合。这种组合是通过一种称为“协程”的模式实现的。使用 Python 3.4 的标准 asyncio 库和一个名为“aiohttp”的包在协程中获取 URL 非常直接 asyncio.coroutinedef fetch(self, url):response yield from self.session.get(url)body yield from response.read()协程也是可扩展的。与每个线程 50k 的内存和操作系统对线程的硬性限制相比Python 协程在 Jesse 的系统上仅占用 3k 的内存。Python 可以轻松启动数十万个协程。 协程的概念可以追溯到计算机科学的早期很简单它是一个可以暂停和恢复的子程序。虽然线程由操作系统抢先地进行多任务处理但协程协同处理多任务它们选择何时暂停以及接下来要运行哪个协程。 协程有许多实现;即使在 Python 中也有几个。Python 3.4 中标准“asyncio”库中的协程基于生成器、Future 类和“yield from”语句构建。从 Python 3.5 开始协程是语言本身 的原生特性;但是理解协程最初是在 Python 3.4 中实现的使用预先存在的语言工具是解决 Python 3.5 原生协程的基础。 为了解释 Python 3.4 基于生成器的协程我们将对生成器以及它们如何在 asyncio 中用作协程进行阐述相信您会喜欢阅读它就像我们喜欢编写它一样。一旦我们解释了基于生成器的协程我们将在异步网络爬虫中使用它们。 Python 生成器Generator 的工作原理 在掌握 Python 生成器之前您必须了解常规 Python 函数的工作原理。通常当 Python 函数调用子例程时子例程会保留控制权直到它返回或引发异常。然后控制权返回给调用方 def foo(): ... bar() ...def bar(): ... pass标准的 Python 解释器是用 C 语言编写的。执行 Python 函数的 C 函数被巧妙地称为 PyEval_EvalFrameEx 。它采用一个 Python堆栈帧对象并在帧的上下文中计算 Python 字节码。这是foo的字节码 import disdis.dis(foo)2 0 LOAD_GLOBAL 0 (bar)3 CALL_FUNCTION 0 (0 positional, 0 keyword pair)6 POP_TOP7 LOAD_CONST 0 (None)10 RETURN_VALUEfoo 函数加载 bar 到其堆栈上并调用然后从堆栈中弹出其返回值加载 None 到堆栈上然后返回 None 。 当PyEval_EvalFrameEx 遇到字节码CALL_FUNCTION时 它会创建一个新的 Python 堆栈帧并递归也就是说它以递归方式调用 PyEval_EvalFrameEx 新帧用于执行 bar 。 了解 Python 堆栈帧是在堆内存中分配的这一点至关重要Python 解释器是一个普通的 C 程序所以它的堆栈帧是普通的堆栈帧。但是它操作的 Python 堆栈帧在堆上。令人惊讶的是这意味着 Python 堆栈帧可以比其函数调用更长久。要以交互方式查看此内容请从bar 中保存当前帧 import inspectframe Nonedef foo(): ... bar() ...def bar(): ... global frame ... frame inspect.currentframe() ...foo()# The frame was executing the code for bar.frame.f_code.co_name bar# Its back pointer refers to the frame for foo.caller_frame frame.f_backcaller_frame.f_code.co_name foo舞台已为 Python 生成器搭建它们利用相同的构建块——代码对象和堆栈框架——创造出令人惊叹的效果。 这是一个生成器函数 def gen_fn(): ... result yield 1 ... print(result of yield: {}.format(result)) ... result2 yield 2 ... print(result of 2nd yield: {}.format(result2)) ... return done ... 当 Python 编译 gen_fn 为字节码时它会看到该 yield 语句并知道这是一个 gen_fn 生成器函数而不是常规函数。它设置了一个标志来记住这个事实 # The generator flag is bit position 5.generator_bit 1 5bool(gen_fn.__code__.co_flags generator_bit) True调用生成器函数时Python 会看到生成器标志并且它实际上不会运行该函数而是创建一个生成器 gen gen_fn()type(gen) class generatorPython 生成器封装了一个堆栈帧以及对某些代码的引用即 gen_fn gen.gi_code.co_name gen_fn从调用到 gen_fn 的所有生成器都指向相同的代码。但每个都有自己的堆栈框架。此堆栈帧不在任何实际堆栈上它位于堆内存中等待使用 该帧有一个“最后一条指令(last instruction)”指针即它最近执行的指令。一开始最后一个指令指针是 -1表示生成器尚未开始 gen.gi_frame.f_lasti -1当我们调用send 时生成器到达它的第一个 yield 并暂停。 send 的返回值为 1因为这是 gen 传递给 yield 表达式的内容 gen.send(None) 1生成器的指令指针现在从头开始的 3 个字节是编译后的 Python 的 56 字节的一部分 gen.gi_frame.f_lasti 3len(gen.gi_code.co_code) 56生成器可以在任何时候从任何函数恢复因为它的堆栈帧实际上不在栈(stack)上它在堆(heap)上。它在调用层次结构中的位置不是固定的它不需要像常规函数那样遵循先进后出的执行顺序。它被解放了像云一样自由地漂浮。 我们可以将值“hello”发送到生成器中它成为 yield 表达式的结果生成器继续直到它产生 2 gen.send(hello) result of yield: hello 2它的堆栈帧现在包含局部变量 result gen.gi_frame.f_locals {result: hello}从 gen_fn 中创建的其他生成器将具有自己的堆栈帧和局部变量。 当我们再次调用 send 时生成器从其第二个 yield 继续并通过引发特殊 StopIteration 异常来结束 gen.send(goodbye) result of 2nd yield: goodbye Traceback (most recent call last):File input, line 1, in module StopIteration: done异常有一个值该值是生成器的返回值字符串 “done” 。 使用生成器构建协程 所以生成器可以暂停它可以用一个值恢复它有一个返回值。听起来像是一个很好的原语可以在其上构建异步编程模型而无需意大利面条回调我们想要构建一个“协程”一个与程序中的其他例程协同调度的例程。我们的协程将是 Python 标准“asyncio”库中的协程的简化版本。与asyncio 一样我们将使用生成器、futures和yield from语句。 首先我们需要一种方法来表示协程正在等待的一些future结果。 精简版Future class Future:def __init__(self):self.result Noneself._callbacks []def add_done_callback(self, fn):self._callbacks.append(fn)def set_result(self, result):self.result resultfor fn in self._callbacks:fn(self)Future最初是“悬而未决”的。它通过调用 set_result 来“解决”。 让我们调整我们的Fetcher以使用 futures 和协程。我们为 fetch 写了一个回调 class Fetcher:def fetch(self):self.sock socket.socket()self.sock.setblocking(False)try:self.sock.connect((xkcd.com, 80))except BlockingIOError:passselector.register(self.sock.fileno(),EVENT_WRITE,self.connected)def connected(self, key, mask):print(connected!)# And so on....fetch()方法首先连接套接字然后注册回调connected以便在套接字准备就绪时执行。现在我们可以将这两个步骤合并到一个协程中 def fetch(self):sock socket.socket()sock.setblocking(False)try:sock.connect((xkcd.com, 80))except BlockingIOError:passf Future()def on_connected():f.set_result(None)selector.register(sock.fileno(),EVENT_WRITE,on_connected)yield fselector.unregister(sock.fileno())print(connected!)现在fetch 是一个生成器函数它包含一个 yield 语句。我们创建一个挂起的Future然后用yield暂停直到套接字准备就绪。内部函数 on_connected 决定future。 但是当future确定时谁来恢复生成器我们需要一个协程驱动程序。我们称之为“任务”task class Task:def __init__(self, coro):self.coro corof Future()f.set_result(None)self.step(f)def step(self, future):try:next_future self.coro.send(future.result)except StopIteration:returnnext_future.add_done_callback(self.step) # Begin fetching http://xkcd.com/353/ fetcher Fetcher(/353/) Task(fetcher.fetch()) loop()任务通过 发送None 来启动 fetch 生成器。然后 fetch 运行直到产生一个future任务将其捕获为 next_future。当套接字连接时事件循环运行回调 on_connected它确定future调用 step()恢复 fetch。 分解 yield from 协程 连接套接字后我们发送 HTTP GET 请求并读取服务器响应。这些步骤不再需要分散在回调中;我们将它们收集到相同的生成器函数中 def fetch(self):# ... connection logic from above, then:sock.send(request.encode(ascii))while True:f Future()def on_readable():f.set_result(sock.recv(4096))selector.register(sock.fileno(),EVENT_READ,on_readable)chunk yield fselector.unregister(sock.fileno())if chunk:self.response chunkelse:# Done reading.break这段代码从套接字读取整条消息似乎是通用的。我们如何将其从 fetch 中分解为一个子程序这时Python 3 中著名的 yield from 登场了它允许将生成器委托给另一个生成器。 为了了解如何操作让我们回到简单的生成器示例 def gen_fn(): ... result yield 1 ... print(result of yield: {}.format(result)) ... result2 yield 2 ... print(result of 2nd yield: {}.format(result2)) ... return done ... 要从另一个生成器调用此生成器通过yield from 委托 # Generator function:def caller_fn(): ... gen gen_fn() ... rv yield from gen ... print(return value of yield-from: {} ... .format(rv)) ...# Make a generator from the# generator function.caller caller_fn()caller 生成器的行为就好像它是被委托的生成器gen caller.send(None) 1caller.gi_frame.f_lasti 15caller.send(hello) result of yield: hello 2caller.gi_frame.f_lasti # Hasnt advanced. 15caller.send(goodbye) result of 2nd yield: goodbye return value of yield-from: done Traceback (most recent call last):File input, line 1, in module StopIterationcaller从gen生成值时 不会前进。请注意即使内部生成器gen从一个 yield 语句 前进到下一个语句它的指令指针仍保持在 15 处即其 yield from 语句的位置。 从外部 caller 的角度来看我们无法判断它产生的值是来自 caller 还是来自它委托给的生成器。从内部 gen 我们无法判断值是从内部发送的 caller 还是从外部发送的。yield from 语句是一个无摩擦的通道值通过该通道流入和流出 gen 直到 gen 完成。 协程可以通过yield from 将工作委派给 子协程并接收工作结果。请注意上面 caller 打印了“yield-from done”的返回值。 gen完成后 其返回值成为caller中 yield from的值 rv yield from gen早些时候当我们批评基于回调的异步编程时我们最尖锐的抱怨是关于“堆栈撕裂”当回调抛出异常时堆栈跟踪通常是无用的。它只显示事件循环正在运行回调而不是为什么。协程的表现如何 def gen_fn(): ... raise Exception(my error)caller caller_fn()caller.send(None) Traceback (most recent call last):File input, line 1, in moduleFile input, line 3, in caller_fnFile input, line 2, in gen_fn Exception: my error这更有用堆栈跟踪显示 caller_fn 在抛出错误时委派给 gen_fn 。更令人欣慰的是我们可以将对子协程的调用包装在异常处理程序中普通子例程也是如此 def gen_fn(): ... yield 1 ... raise Exception(uh oh) ...def caller_fn(): ... try: ... yield from gen_fn() ... except Exception as exc: ... print(caught {}.format(exc)) ...caller caller_fn()caller.send(None) 1caller.send(hello) caught uh oh因此就像使用常规子程序一样我们将逻辑分解成子协程。让我们从fetcher中分解出一些子协程。我们编写 read 协程来接收一个块 def read(sock):f Future()def on_readable():f.set_result(sock.recv(4096))selector.register(sock.fileno(), EVENT_READ, on_readable)chunk yield f # Read one chunk.selector.unregister(sock.fileno())return chunkread_all协程接收完整消息 def read_all(sock):response []# Read whole response.chunk yield from read(sock)while chunk:response.append(chunk)chunk yield from read(sock)return b.join(response)如果你眯着眼睛看这些 yield from 语句就会消失这些语句看起来像是执行阻塞 I/O 的传统函数。但实际上 read 和 read_all 是协程。yield from read 会暂停 read_all直到 I/O 完成。当read_all暂停时 asyncio 的事件循环执行其他工作并等待其他 I/O 事件; 一旦read_all就绪就会在下个循环中以read的结果恢复。 在堆栈的根目录下fetch调用read_all class Fetcher:def fetch(self):# ... connection logic from above, then:sock.send(request.encode(ascii))self.response yield from read_all(sock)奇迹般地Task 类不需要修改。它像以前一样驱动外部 fetch 协程 Task(fetcher.fetch()) loop()当read产生一个future时 task通过 yield from的通道接收它就好像future是直接从 fetch 中产生的一样。当循环解决完future时任务将其结果发送到 fetch 中并且该值由 read 接收就像task直接驱动 read 一样 了完善我们的协程实现我们去除了一个瑕疵我们的代码在等待future时使用 yield 但在委托给子协程时使用 yield from 。如果我们在协程暂停时始终使用 yield from 那将更加精致。这样一来协程就不需要关心等待的是什么类型的东西。 我们利用了 Python 中生成器和迭代器之间的深度对应关系。对于调用者来说使用生成器与使用迭代器相同。因此我们通过实现特殊方法__iter__使 Future 类可迭代 # Method on Future class.def __iter__(self):# Tell Task to resume me here.yield selfreturn self.resultFuture的 __iter__ 方法是一个协程序返回Future自身。 现在当我们将代码 # f is a Future. yield f替换为 # f is a Future. yield from f…结果是一样的驱动Task从其对send 的调用 中接收future当future被决定时它会将新结果发送回协程。 统一使用 yield from 有什么好处为什么这比等待具有 yield 的future和委托给具有 yield from 的子协程更好这更好因为现在一个方法可以自由地更改其实现而不影响调用者它可能是一个返回future的普通方法也可能是一个包含 yield from 语句并返回值的协程。对于这两种情况调用者只需要 yield from 方法以等待结果。 耐心的读者我们已经结束了对异步协程的愉快阐述。我们窥视了生成器的机制并勾勒出future和task的实现。我们概述了 asyncio 如何的两全其美并发 I/O 比线程更高效比回调更清晰。当然真正的异步比我们的草图要复杂得多。真正的异步框架解决了零拷贝 I/O、公平调度、异常处理和大量其他功能。 对于 asyncio 用户来说使用协程进行编码比您在这里看到的要简单得多。在上面的代码中我们从第一性原理实现了协程因此您看到了回调、任务和未来。您甚至看到了非阻塞套接字和对 select .但是当需要使用 asyncio 构建应用程序时这些都不会出现在您的代码中。正如我们承诺的那样您现在可以时尚地获取 URL asyncio.coroutinedef fetch(self, url):response yield from self.session.get(url)body yield from response.read()对这个阐述感到满意我们回到我们最初的任务使用 asyncio 编写一个异步网络爬虫。 协调协程 我们首先描述了我们希望我们的爬虫如何工作。现在是时候用 asyncio 协程来实现它了。 我们的爬虫将获取第一页解析其链接并将链接添加到队列中。在此之后它会在整个网站上扇出同时获取页面。但是为了限制客户端和服务器上的负载我们希望运行一些最大数量的工作线程而不是更多。每当worker完成获取页面时它应该立即从队列中拉取下一个链接。我们将经历没有足够的工作可以进行的时期因此一些worker必须暂停。但是当一个worker点击一个包含新链接的页面时队列会突然增加任何暂停的worker都应该醒来并破解。最后完成工作后程序就退出。 想象一下如果worker是线程该如何表达爬虫的算法我们可以使用 Python 标准库中的同步队列。每次将项目放入队列时队列都会递增其“task”计数。工作线程在完成项目工作后进行调用 task_done 。主线程会在Queue.join 阻塞直到队列中的每个项目都与 task_done 调用匹配然后退出。 协程使用与异步队列完全相同的模式首先我们导入Queue try:from asyncio import JoinableQueue as Queue except ImportError:# Python 3.5: asyncio.JoinableQueue is merged into Queue.from asyncio import Queue我们在crawler类中 收集worker的共享状态crawl 方法中编写主逻辑。我们从协程启动 crawl 运行 asyncio 的事件循环直到 crawl 完成 loop asyncio.get_event_loop() crawler crawling.Crawler(http://xkcd.com,max_redirect10) loop.run_until_complete(crawler.crawl())爬虫以根 URL 和 max_redirect某个 URL 的最大重定向数量开始。它将(URL, max_redirect) 放入队列中。原因敬请期待 class Crawler:def __init__(self, root_url, max_redirect):self.max_tasks 10self.max_redirect max_redirectself.q Queue()self.seen_urls set()# aiohttps ClientSession does connection pooling and# HTTP keep-alives for us.self.session aiohttp.ClientSession(looploop)# Put (URL, max_redirect) in the queue.self.q.put((root_url, self.max_redirect))队列中未完成的任务数现在为 1。回到我们的主脚本中我们启动事件循环和 crawl方法 loop.run_until_complete(crawler.crawl())crawl 协程启动了worker。它类似主线程一直在 join 处阻塞直到所有任务完成而worker在后台运行。 asyncio.coroutinedef crawl(self):Run the crawler until all work is done.workers [asyncio.Task(self.work())for _ in range(self.max_tasks)]# When all work is done, exit.yield from self.q.join()for w in workers:w.cancel()如果 worker 是线程我们可能不希望一次启动它们。为了避免在确定需要之前创建昂贵的线程线程池通常会按需增长。但是协程很便宜所以我们从允许的最大数量开始。 有趣的是如何关闭爬虫。当 join future确定时工作线程任务处于活动状态但已暂停它们等待更多 URL但没有一个 URL 出现。因此主协程在退出之前会取消它们。否则当 Python 解释器关闭并调用所有对象的析构函数时活动任务会大叫 ERROR:asyncio:Task was destroyed but it is pending!cancel 如何 工作生成器具有我们尚未向您展示的功能。您可以从外部将异常抛入生成器 gen gen_fn()gen.send(None) # Start the generator as usual. 1gen.throw(Exception(error)) Traceback (most recent call last):File input, line 3, in moduleFile input, line 2, in gen_fn Exception: error生成器由 throw 恢复但现在引发异常。如果生成器的调用堆栈中没有捕获它则异常会冒泡回顶部。因此要取消任务的协程 # Method of Task class.def cancel(self):self.coro.throw(CancelledError)无论生成器在哪里暂停在某个 yield from 语句中它都会恢复并抛出异常。我们在任务的 step 的方法处理取消 # Method of Task class.def step(self, future):try:next_future self.coro.send(future.result)except CancelledError:self.cancelled Truereturnexcept StopIteration:returnnext_future.add_done_callback(self.step)现在任务知道它被取消了所以当它被摧毁时它不会对光的消逝感到愤怒。 一旦 crawl 取消了工作线程它就会退出。事件循环看到协程完成了我们稍后会看到它也会退出 loop.run_until_complete(crawler.crawl())crawl 方法包含了我们的主要协程必须执行的所有操作。工作器协程从队列中获取 URL获取它们并解析它们以获取新链接。每个 worker 独立运行 work 协程 asyncio.coroutinedef work(self):while True:url, max_redirect yield from self.q.get()# Download page and add new links to self.q.yield from self.fetch(url, max_redirect)self.q.task_done()Python 看到此代码包含 yield from 语句并将其编译为生成器函数。所以在 crawl中 当主协程调用self.work 十次时它实际上并没有执行这个方法它只创建十个引用此代码的生成器对象。它将每个包装在一个 Task 中。task接收 生成器产生的future并通过在future确定时以每个future的结果调用send 来驱动生成器。由于生成器有自己的堆栈帧因此它们独立运行具有单独的局部变量和指令指针。 worker通过队列和同伴进行协调通过下面代码等待新的url url, max_redirect yield from self.q.get()队列的 get 方法本身就是一个协程它会暂停直到有人将一个项目放入队列中然后恢复并返回该项目。 顺便说一句这是在爬虫结束时当主协程取消它时worker暂停的位置。从协程的角度来看它在循环中的最后一次行程在 yield from 引发 CancelledError。 当worker获取页面时它会解析链接并将新链接放入队列中然后调用 task_done 以递减计数器。最终worke获取一个页面其 URL 已全部获取并且队列中也没有剩余工作。因此该worke的调用 task_done 将计数器递减为零。然后 crawl 正在等待队列 join 方法的 取消暂停并完成。 我们承诺解释为什么队列中的项目是成对的例如 # URL to fetch, and the number of redirects left. (http://xkcd.com/353, 10)新 URL 还剩下 10 个重定向。获取此特定 URL 会导致重定向到带有尾部斜杠的新位置。我们减少剩余的重定向数量并将下一个位置放入队列中 # URL with a trailing slash. Nine redirects left. (http://xkcd.com/353/, 9)默认情况下 aiohttp 将遵循重定向并给我们最终响应。但是我们告诉它不要这样做并在爬虫中处理重定向因此它可以合并指向同一目的地的重定向路径如果我们已经看到此 URL则它已在 self.seen_urls 并且我们已经从不同的入口点开始此路径 爬虫获取“foo,并看到它重定向到baz因此将baz添加到队列和seen_urls。如果下一页是”bar并且也重定向到baz则不会再次添加baz。如果响应是页面而不是重定向fetch将解析链接并添加新链接到队列。 asyncio.coroutinedef fetch(self, url, max_redirect):# Handle redirects ourselves.response yield from self.session.get(url, allow_redirectsFalse)try:if is_redirect(response):if max_redirect 0:next_url response.headers[location]if next_url in self.seen_urls:# We have been down this path before.return# Remember we have seen this URL.self.seen_urls.add(next_url)# Follow the redirect. One less redirect remains.self.q.put_nowait((next_url, max_redirect - 1))else:links yield from self.parse_links(response)# Python set-logic:for link in links.difference(self.seen_urls):self.q.put_nowait((link, self.max_redirect))self.seen_urls.update(links)finally:# Return connection to pool.yield from response.release()如果这是多线程代码那么在竞争条件下会很糟糕。例如工作线程检查链接是否在 seen_urls中 如果不是则工作线程将其放入队列中并将其添加到 seen_urls 中。如果它在两个操作之间被中断那么另一个工作线程可能会从不同的页面解析相同的链接同时观察它不在seen_urls 中 并将其添加到队列中。现在同一个链接在队列中两次导致充其量重复工作和错误的统计数据。 但是协程仅在语句yield from时会受到中断。这是一个关键的区别使协程代码比多线程代码更不容易发生争用多线程代码必须通过抓取锁来显式进入关键部分否则它是可中断的。默认情况下Python 协程是不可中断的并且仅在显式产生控制权时才放弃控制权。 . 我们不再需要像在基于回调的程序中那样的 fetcher 类。该类是解决回调缺点的方法它们在等待 I/O 时需要一些地方来存储状态因为它们的局部变量不会在调用之间保留。但是 fetch 协程可以像常规函数一样将其状态存储在局部变量中因此不再需要类。 fetch 处理完服务器响应后 它会返回给调用方 work 。work 方法调用队列的task_done 然后从队列中获取下一个要提取的URL。 当fetch将新链接放入队列中时 它会增加未完成任务的计数并暂停正在等待 q.join 的主协程。但是如果没有看不见的链接并且这是队列中的最后一个 URL则work调用 task_done 时 未完成任务的计数将降至零。该事件解除join的暂停主协程完成。 协调 worker 和主协程的队列代码是这样的 class Queue:def __init__(self):self._join_future Future()self._unfinished_tasks 0# ... other initialization ...def put_nowait(self, item):self._unfinished_tasks 1# ... store the item ...def task_done(self):self._unfinished_tasks - 1if self._unfinished_tasks 0:self._join_future.set_result(None)asyncio.coroutinedef join(self):if self._unfinished_tasks 0:yield from self._join_future主协程 crawl 从 join中获取值。 因此当最后一个worker将未完成任务的计数减少到零时它会向crawl 发出恢复并完成的信号。 旅程快结束了。程序从调用crawl开始 loop.run_until_complete(self.crawler.crawl())该程序如何结束由于 crawl 是一个生成器函数调用它将返回一个生成器。为了驱动生成器asyncio 将其包装在一个任务中 class EventLoop:def run_until_complete(self, coro):Run until the coroutine is done.task Task(coro)task.add_done_callback(stop_callback)try:self.run_forever()except StopError:pass class StopError(BaseException):Raised to stop the event loop. def stop_callback(future):raise StopError当任务完成时它会发出 StopError 循环将其用作已正常完成的信号。 但这是什么该任务具有名 为 add_done_callback 和 result 的方法你可能会认为一项任务类似于一个future。你的直觉是正确的。我们必须承认一个关于我们向你隐瞒的任务类的细节task就是future。 class Task(Future):A coroutine wrapped in a Future.通常future是由其他人调用 set_result 解决的。但是当task的协程停止时task会自行解决。请记住当生成器返回时它会抛出特殊 StopIteration 异常 # Method of class Task.def step(self, future):try:next_future self.coro.send(future.result)except CancelledError:self.cancelled Truereturnexcept StopIteration as exc:# Task resolves itself with coros return# value.self.set_result(exc.value)returnnext_future.add_done_callback(self.step)因此当事件循环调用 task.add_done_callback(stop_callback) 时它准备被任务停止。 这是 run_until_complete # Method of event loop.def run_until_complete(self, coro):task Task(coro)task.add_done_callback(stop_callback)try:self.run_forever()except StopError:pass当任务捕获 StopIteration 并解决自己时回调将从循环中引发 StopError。循环停止调用堆栈回到 run_until_complete 。程序完成。 结论 现代程序越来越频繁地受 I/O 限制而不是受 CPU 限制。对于这样的程序来说Python 线程是完美的错误选择全局解释器锁阻止它们实际并行计算而抢占式切换使它们容易出现争用。异步通常是正确的模式。但随着基于回调的异步代码的增长它往往会变得一团糟。协程是一个简洁的替代方案。它们自然而然地融入到子例程中具有合理的异常处理和堆栈跟踪。 如果我们眯着眼睛使yield from 语句变得模糊协程看起来像一个执行传统阻塞 I/O 的线程。我们甚至可以将协程与多线程编程中的经典模式进行协调。没有必要重新发明。因此与回调相比协程对于具有多线程经验的编码人员来说是一个诱人的习惯。 但是当我们睁开眼睛并专注于这些 yield from 语句时我们会看到它们在协程放弃控制权并允许其他人运行时标记点。与线程不同协程显示我们的代码可以中断和不能中断的位置。Glyph Lefkowitz 在他的启发性文章“不屈不挠” 中写道“线程使局部推理变得困难而局部推理也许是软件开发中最重要的事情。然而显式让权使得“通过检查例程本身而不是检查整个系统来理解例程的行为以及由此而来的正确性”成为可能。 本章是在 Python 和异步历史上的复兴时期写成的。2014 年 3 月基于生成器的协程在 Python 3.4 的“asyncio”模块中发布。2015 年 9 月Python 3.5 发布其中内置了协程。这些原生协程使用新语法async def声明而不是“yield from”并且代指协程或等待 Future 时不再使用“yield from”而是使用新的“await”关键字。 尽管取得了这些进展核心思想依然保留。Python 的新原生协程在语法上与生成器将有所区分但工作方式非常类似事实上它们将在 Python 解释器内共享实现。Task、Future 和事件循环将继续在其 asyncio 中扮演它们的角色。 现在你知道了 asyncio 协程的工作原理你可以大部分忘记细节。这个机制被隐藏在一个时髦的接口后面。但是你对基本原理的掌握使你能够在现代异步环境中正确高效地编写代码。
http://www.w-s-a.com/news/981006/

相关文章:

  • j2ee网站开发买什么书网络媒体有哪些
  • 江西省住房建设部官方网站用多说的网站
  • 云课堂哪个网站做的好网站 集约化平台建设方案的通知
  • 撰写网站栏目规划怎么建自己的平台
  • 中国建设银行巴黎分行网站建设银行忘记密码网站首页
  • 网站左侧树形导航怎么做像wordpress一样的网站吗
  • 做网站用的书公司做网站 需要解决哪些问题
  • 电器网站建设策划书深圳动画制作
  • cpa网站建设wordpress支付宝微信收费吗
  • 权威网站排名桂林生活网论坛
  • 网站设计息济南网站建设济南
  • 安蓉建设总公司网站网站怎么做才能被百度收录
  • 电子商务网站业务流程分析做效果图的外包网站
  • wordpress仿站视频教程wordpress用什么php版本好
  • 郑州做网站九零后网络沧州做网站的专业公司
  • 小游戏网站建设可以自己做图片的软件
  • 湖南地税局官网站水利建设基金app仿制
  • 苏州网站设计kgwl建设网站需要用到哪些技术人员
  • 万户网络做网站如何亚马逊网站建设
  • 门户网站制作费用暴雪公司最新消息
  • 深圳专业建网站公司济南公司做网站的价格
  • 怎么运行自己做的网站网上申请平台怎么申请
  • 旅游公司网站 优帮云新闻近期大事件
  • 电商网站后台报价营销软文小短文
  • 网站建设项目售后服务承诺公司名称邮箱大全
  • 湖南网站建设哪里好做ppt的网站叫什么名字
  • 容城县建设银行网站电子商务网站建设子项目
  • 网站管理助手3.0做淘宝网站用什么软件做
  • 贵阳做网站的公司wordpress趣味插件
  • 自己设置免费网站设计平台南京哪里有做公司网站的