使用协程池(Coroutine Pool)作为RxPY的数据源(Observable)

关于协程与RxPY

协程(coroutine)是一个有很长历史的概念,它是计算机程序的一类组件,推广了协作式多任务的子程序。其详细的概念和历史请参照维基百科中的条目:https://en.wikipedia.org/wiki/Coroutine
Python天生支持的生成器(generator)其实就是协程的一种实现,生成器允许执行被挂起与被恢复。但是由于缺乏更多语法上的支持,以及缺乏利用生成器实现异步编程的成熟模式,限制了生成器作为协程参与协作式多任务编程的用途。不过现在情况发生了改变,Python自3.6版本开始添加了async/await的语法直接支持协程的异步编程,同时在asyncio库中提供了协程编程的接口以及必要的基础实现。在Python里,多个协程是通过消息循环(Evet Loop)的调度从而异步执行的。请参见笔者的另一篇文章,感受一下协程与传统的顺序执行以及多线程之间的联系和区别:Python 协程(Coroutine)体验
RxPY是响应式编程(Reactive programming)在Python中的实现。响应式编程或反应式编程是一种面向数据流和变化传播的声明式编程范式。其详细的信息请参照维基百科中的条目:https://en.wikipedia.org/wiki/Reactive_programming。在RxPY中,数据流事实上是以同步的方式执行的。组成数据处理管道(Data Pipe)的所有函数均以同步的运行栈的方式调度。只是通过全面的依赖反转使得开发者能以方便灵活的声明方式来组装和更改数据处理管道。从而获得极大的灵活性和可扩展性。
在一些特定的情况下,如何结合这两种编程范式,使得它们能够相互协作就变得非常有趣。这样做既能利用Reactive声明编程的方式来简化程序,又能利用协程异步运行的方式来提高IO的并行程度,从而提高程序的执行速度和效率。例如我们有数据处理管道,在其数据源或是数据目标支持异步IO的情况下,就可以考虑二者的结合运用。
这个帖子 https://blog.oakbits.com/rxpy-and-asyncio.html 提供了一些关于二者结合使用的用例和代码实现。在 RxPY 文档 里也提供了一个关于在RxPY中使用asyncio作为数据源的例子。

关于协程池

跟线程相比,协程本身以及其调度都要轻量很多。但这并不意味着协程可以不受限制地部署和运行。其限制主要来自于内外两个方向。内部的限制主要是有限的运行资源例如内存,处理器等。我们知道一个协程对应于一个内存对象,过多的活动协程将有可能会耗尽内存,也可能会导致等待处理器的时间超过IO的等待时间,从而不再能提高程序运行的速度,使得增加的内存资源消耗变得毫无意义。外部的限制主要来自于数据源或数据目标的并行性限制。例如我们要通过数据API分页地获取一个大数据集,而作为数据源的API很可能对能够安全访问的并行度有着或明或暗的限制。超过限制数目的并行请求将有可能被拒绝,甚至会导致API本身运行/数据发生混乱,直至停止服务。协程池是一种在协程环境中限制并行度的简单有效的方法。跟多线程环境下的线程池的概念相似,协程池拥有一定数量的协程,每个协程独立地领取并执行任务。当某个协程完成一个任务时,该协程将继续领取下一个任务直至所有任务完成。除此以外,asynio中提供的同步原语也可以被用来限制并行的程度和活动协程的数量,同步原语的使用不在本文的讨论范围。

程序实列

以下程序可以在Python 3.6+上运行。

import asyncio
import random
import rx
import functools
import selectors
import time


def job_generator():              # 1
    job_id = 1
    while True:
        yield job_id
        job_id += 1


async def worker(worker_ame, job_gen, observer):   # 2
    for job_id in job_gen:
        if job_id >= 200:                          # 3
            job_gen.close()
            break
        await asyncio.sleep(random.uniform(0.01, 0.1))  # 4
        observer.on_next(f'{worker_ame}:  {job_id}')    # 5


def data_source():                                      # 6
    
    def on_subscribe(observer, scheduler):              # 7
        
        async def _aio_sub(loop):                       # 8
            tasks = []                                  # 9
            job_gen = job_generator()
            for i in range(3):
                task = asyncio.create_task(worker(f'Worker-{i}', job_gen, observer))
                tasks.append(task)
            
            # Wait until all worker tasks are finished/cancelled.
            try: 
                await asyncio.gather(*tasks)            # 10
                loop.call_soon(observer.on_completed)
            except Exception as e:                      # 11
                loop.call_soon(functools.partial(observer.on_error, e))
                for task in tasks:
                    task.cancel()
                raise e
            
        selector = selectors.SelectSelector()
        loop = asyncio.SelectorEventLoop(selector)
        asyncio.set_event_loop(loop)
        loop.run_until_complete(_aio_sub(loop))

    return rx.create(on_subscribe)


if __name__ == '__main__':
    started_at = time.monotonic()
    source = data_source()                          # 12
    source.subscribe(                               # 13
        on_next = lambda i: print("Received {0}".format(i)),
        on_error = lambda e: print("Error Occurred: {0}".format(e)),
        on_completed = lambda: print("Done!"),
    )
    total_time = time.monotonic() - started_at      # 14
    print('====')
    print(f'Used {total_time:.2f} seconds')

一个可能的输出看起来像是这个样子:

c:\PortableApps>python RxPYWithAsyncIO.py
Received Worker-0:  1
Received Worker-0:  4
Received Worker-0:  5
Received Worker-1:  2
Received Worker-2:  3
Received Worker-0:  6
Received Worker-2:  8
Received Worker-1:  7
Received Worker-1:  11
Received Worker-0:  9

..........

Received Worker-1:  188
Received Worker-0:  184
Received Worker-2:  190
Received Worker-0:  192
Received Worker-1:  191
Received Worker-1:  195
Received Worker-0:  194
Received Worker-2:  193
Received Worker-1:  196
Received Worker-1:  199
Received Worker-2:  198
Received Worker-0:  197
Done!
====
Used 3.52 seconds

注意在输出中我们期望看到的几处关键内容:

  1. 200行型如 Recieved : 。其中CoroutineName为Worker-0,1,2之一。Job_Id的值为0-199。特别的,输出的Job_Id不会是按照顺序的,这是因为在代码中,每个Job的运行时间是一个随机值。不同job的运行时间有长有短。另外可以观察到所有的协程均参与了任务的处理,它们在输出中交替出现。
  2. 所有Job完成后输出的“Done!”。这是在所有协程完成以后,Observable发送on_complete到注册(Subscribe)的Observer,由Observer打印出的消息。
  3. 最后输出的总的消耗的时间,通常在使用3个协程的情况下是3秒多。可以看到本次运行的时间是3.52秒
    下面让我们沿着代码中标注的序号来做详细的解读。
  4. job_generator是任务生成器,其实例为所有协程共享。工作协程worker从这里领取任务并完成。之后领取下一个任务。从一个唯一的任务生成器领取任务保证了任务不会被重复分发。在访问数据API并且分页获取数据的应用中,任务生成器所产生的内容可能是带有页号的URL;在网络爬虫中,其产生的内容可能是目标网站的地址。注意到这里使用了job而不是task作为其名称,这是为了跟asyncio中的task(对应于一个可并行运行的协程)有所区别。
  5. worker是一个异步函数,运行起来就是一个工作协程。多个worker组成了本例中数据源的协程池。Worker接受一个名字用于输出日志,一个任务生成器job_generator的实例,以及作为数据目标的observer。Worker持续地从任务生成器中领取任务,完成任务,将结果发送到observer。由于协程都是轮流调度的,不会发生多个协程同时运行的情况,因此在协程间共享的数据资源例如job_generator和observer不需要做任何的保护。
  6. 选择在worker中判断结束条件是有现实意义的。很多时候我们不太能够在一开始就确定所有的任务,很可能需要在一个或者多个任务的结果中去判断是否还有余下的工作。例如在分页获取数据的应用中,常常需要在返回的数据集中查看是否有下一页的URL或是判断当前页号是否已经达到或超过了数据集的总页数。当发现结束条件达到的时候,除了结束本协程外,还要通知其他协程尽快结束工作,为此关闭任务生成器是一个好方法。当其他协程完成手上的任务再次尝试领取时会发现已经没有更多的任务了。
  7. 这行代码模拟了一个异步的IO,等待一段随机的时间。在现实环境中通常会使用异步IO获取数据,例如使用aiohttp访问http数据API。
  8. 通过调用observer的on_next方法将任务的结果放入数据处理管道。这里只是简单地将job_id加上处理协程的名字作为结果。现实环境中大多需要处理和转换IO获取的结果数据。注意这里的on_next方法是同步方法,如果管道中的数据处理过于耗时的话,会严重阻塞整个协程池的运行。有必要的话,这里需要使用并行化的方法来保证on_next调用尽快返回,这部分内容不在本文的讨论范围之内。
  9. 一个简单的factory函数,使用RxPY的create方法将函数on_subscribe包装成observable并返回。这是RxPY中常用的手法,详情请参见其文档和实例。
  10. on_subscribe函数是跨越同步和异步世界的桥梁,它本身实现了RxPY关于Observable的接口协议,是一个传统的同步函数。在其内部定义了顶层的异步函数_aio_sub,被驱动运行时首先创建并启动消息循环(Event Loop),使用loop.run_until_complete将异步函数_aio_sub放置并运行在消息循环上。这一步同时也将自己的线程动力(就是活动的运行栈)交到消息循环,并等待_aio_sub运行结束,最后以传统同步的方式结束并退出。
  11. _aio_sub是异步世界的顶层入口。当被消息循环驱动运行的时候,它首先创建协程池,然后等待所有在协程池中的协程运行结束,收集运行结果并返回。
  12. 在协程池中,每个协程worker都将被创建成一个asyncio的task,这使得它们能够被消息循环交替地调度运行,并由创建者等待收集运行结果。本例地代码中可以看出我们部署了3个协程。读者可以自行调整协程和任务的数量,观察总的运行时间。
  13. asyncio.gather有三个作用。等待所有task运行结束;收集所有task的返回值,本例中由于worker本身没有返回值,所以当一切运行顺利的话,收集到的返回值将会全部是空值None;当协程运行中有任何异常抛出时,将会抛出第一个产生的异常。
  14. 本例中当第一个异常被捕获时,程序将异常传递到数据流上从而终止数据流。然后取消所有协程的运行并抛出异常,这事实上终止了数据管道的运行。这样的行为对于分页获取数据的应用是合理的。对于需要爬大量网站的网络爬虫来说,少数网站抛出异常是正常的,在这种情况下应该只将捕获的异常记入日志,然后继续等待协程池的运行直至结束。这需要首先改动协程worker的运行逻辑。
  15. 创建observable作为数据源。RxPY的常见代码。
  16. 在数据源上注册一个最简单的observer。该observer仅仅将收到的数据和事件打印出来。在这一步,复杂的数据处理管道也可以被组装。其最后的注册调用source.subscribe实际上通过运行on_subscribe函数而启动了整个数据管道的运行。我们已经知道RxPY的调用是同步的,当source.subscribe运行结束返回的时候,我们能够确定的是数据管道的运行已经结束,要么所有期望的数据都已经通过管道顺利处理,要么在处理中发生并抛出了异常。
  17. 计算并输出运行时间的简单方法,没有什么需要多解释的。

你可能感兴趣的