RxPY学习笔记

Catalogue   

基本概念参考这里,Python的版本也是根据这些概念,通过使用自己的特性实现。但由于种种原因,Python的不同版本间变化很大。可以参考这里

基本的用法可以参考文档和源码中的test,这里只记录一些自己遇到的问题和解决方案

官方使用文档翻译

安装

如果使用V3.x版本:

1
pip install rx

如果使用1.6的版本

1
pip install rx==1.6.1

基本概念

RXPY,使用可观察序列,查询操作符组合,可异步化,基于事件的一套Python库。使用Rx,开发者可以使用Observables来表示异步数据流,使用operators查询异步数据流,使用Schedulers对数据/事件流进行异步参数化。

使用Rx,你可以表示多个异步数据流,使用观察者(Observer)对象来订阅(subscribe)事件流。当事件发生时可观察者(Observable)通知订阅者。你可以在Observable和Observer之间放置各种转换。

因为Observable队列是数据流,你可以使用相关操作符进行链式操作。你可以filter,map,reduce,compose,基于时间的操作,等等。除此之外,还有许多其他特定于响应流的操作符允许编写功能强大的查询。取消、异常和同步也可以通过专用的操作符处理。

开始

Observable是ReactiveX的核心类型。它串行的添加元素,像排放系统一样,通过一系列的操作直到最后传递到Observer,在那里它们被消费。

基于推(而不是基于拉)的迭代为表达代码和并发提供了强大的新可能性。因为一个Observable将事件当作数据,也将数据当作事件。

有很多种方式创建Obserable,你可以使用create()工厂方法,传递一个函数给他。

- on_next在Observable发射一个元素时被调用
- on_completed在Obserable完成时被调用
- on_error在Obserable发生错误时被调用

这三个回调是可选的。

让我们看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from rx import create

def push_five_strings(observer, scheduler):
observer.on_next("Alpha")
observer.on_next("Beta")
observer.on_next("Gamma")
observer.on_next("Delta")
observer.on_next("Epsilon")
observer.on_completed()

source = create(push_five_strings)

source.subscribe(
on_next = lambda i: print("Received {0}".format(i)),
on_error = lambda e: print("Error Occurred: {0}".format(e)),
on_completed = lambda: print("Done!"),
)

Observable通过create创建,push_five_strings被调用。这个函数发射了五个元素。subscribe中实现了三个回调函数。输出结果为:

1
2
3
4
5
6
Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!

of工厂函数,接收list参数,会依此发射它们,如果忽略completion和error,subscribe可以支传一个参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from rx import of

source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

source.subscribe(
on_next = lambda i: print("Received {0}".format(i)),
on_error = lambda e: print("Error Occurred: {0}".format(e)),
on_completed = lambda: print("Done!"),
)

输出结果:
Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon

操作符和链接

你可以使用超过130个操作符来导出新的Observables,每个操作符会生产一个Observable,它会以管道的形式改变原来的Observable。比如,我们使用map()处理每个字符串的长度,然后filter()来过滤长度超过5的元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from rx import of, operators as op

source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

composed = source.pipe(
op.map(lambda s: len(s)),
op.filter(lambda i: i >= 5)
)
composed.subscribe(lambda value: print("Received {0}".format(value)))

Received 5
Received 5
Received 5
Received 7

还可以使用以下方式将整个操作链接起来,还能减少临时变量的产生

1
2
3
4
5
6
from rx import of, operators as op

of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
op.map(lambda s: len(s)),
op.filter(lambda i: i >= 5)
).subscribe(lambda value: print("Received {0}".format(value)))

自定义操作符

随着操作符链的发展,为了提高代码的可读性,须对链进行拆分。新的操作符被实现为函数,可以直接在管道操作符中使用。当一个操作符被实现为其他操作符的组合时,由于管道函数的存在,实现非常简单

1
2
3
4
5
6
7
8
9
10
11
12
import rx
from rx import operators as ops

def length_more_than_5():
return rx.pipe(
ops.map(lambda s: len(s)),
ops.filter(lambda i: i >= 5),
)

rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
length_more_than_5()
).subscribe(lambda value: print("Received {0}".format(value)))

也可以自己创建一个操作符,以下操作符实现了将字符串转换为小写的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import rx

def lowercase():
def _lowercase(source):
def subscribe(observer, scheduler = None):
def on_next(value):
observer.on_next(value.lower())

return source.subscribe(
on_next,
observer.on_error,
observer.on_completed,
scheduler)
return rx.create(subscribe)
return _lowercase

rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
lowercase()
).subscribe(lambda value: print("Received {0}".format(value)))

并发控制

为了实现并发,你要使用两个操作符:subscribe_on和observe_on。两者都需要Scheduler来提供线程给订阅者工作。ThreadPoolScheduler是一个重用线程的好选择。

GIL可能会破坏并发性能,因为它阻止多个线程同时访问同一行代码。像NumPy这样的库可以在释放GIL时减轻这种并行密集计算的负担。RxPy还可以在某种程度上减少线程重叠。一定要用并发性测试您的应用程序,并确保有性能增益.

subscribe_on()指示链开始处的源Observable序使用哪个调度器(不管您将该操作符放在哪里)。不过,observe_on()会在这个时候切换到不同的调度程序,有效地将发射从一个线程转移到另一个线程。一些工厂函数和操作符,如interval()和delay(),已经有了默认的调度程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import multiprocessing
import random
import time
from threading import current_thread

import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops


def intense_calculation(value):
# sleep for a random short duration between 0.5 to 2.0 seconds to simulate a long-running calculation
time.sleep(random.randint(5, 20) * 0.1)
return value


# calculate number of CPUs, then create a ThreadPoolScheduler with that number of threads
optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)

# Create Process 1
rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
ops.map(lambda s: intense_calculation(s)), ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda s: print("PROCESS 1: {0} {1}".format(current_thread().name, s)),
on_error=lambda e: print(e),
on_completed=lambda: print("PROCESS 1 done!"),
)

# Create Process 2
rx.range(1, 10).pipe(
ops.map(lambda s: intense_calculation(s)), ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda i: print("PROCESS 2: {0} {1}".format(current_thread().name, i)),
on_error=lambda e: print(e),
on_completed=lambda: print("PROCESS 2 done!"),
)

# Create Process 3, which is infinite
rx.interval(1).pipe(
ops.map(lambda i: i * 100),
ops.observe_on(pool_scheduler),
ops.map(lambda s: intense_calculation(s)),
).subscribe(
on_next=lambda i: print("PROCESS 3: {0} {1}".format(current_thread().name, i)),
on_error=lambda e: print(e),
)

input("Press any key to exit\n")

IO异步

许多异步框架都支持IO异步,RxPY中使用相关连的schedulers。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from collections import namedtuple
import asyncio
import rx
import rx.operators as ops
from rx.subject import Subject
from rx.scheduler.eventloop import AsyncIOScheduler

EchoItem = namedtuple('EchoItem', ['future', 'data'])


def tcp_server(sink, loop):
def on_subscribe(observer, scheduler):
async def handle_echo(reader, writer):
print("new client connected")
while True:
data = await reader.readline()
data = data.decode("utf-8")
if not data:
break

future = asyncio.Future()
observer.on_next(EchoItem(
future=future,
data=data
))
await future
writer.write(future.result().encode("utf-8"))

print("Close the client socket")
writer.close()

def on_next(i):
i.future.set_result(i.data)

print("starting server")
server = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
loop.create_task(server)

sink.subscribe(
on_next=on_next,
on_error=observer.on_error,
on_completed=observer.on_completed)

return rx.create(on_subscribe)


loop = asyncio.get_event_loop()
proxy = Subject()
source = tcp_server(proxy, loop)
aio_scheduler = AsyncIOScheduler(loop=loop)

source.pipe(
ops.map(lambda i: i._replace(data="echo: {}".format(i.data))),
ops.delay(5.0)
).subscribe(proxy, scheduler=aio_scheduler)

loop.run_forever()
print("done")
loop.close()

默认的Scheduler

有多种方式选额scheduler。第一种是给操作符提供scheduler。但当有多个操作符时,可能会被忽略。所以第二种方式时提供默认的scheduler。

1
2
3
source.pipe(
...
).subscribe(proxy, scheduler=my_default_scheduler)

操作符选择scheduler的方式如下:

  • 如果给操作符提供了scheduler,选择它
  • 如果默认scheduler有提供,选择它
  • 否则选择自带的

迁移

https://rxpy.readthedocs.io/en/latest/migration.html

从RxPY v1迁移到v3

操作符

创建Observable

操作符 描述
create 创建一个Observable
empty 创建空的Obserbale,发射空元素,马上结束
never
throw 创建一个立马抛出错误的Observable
from_ 转换其他数据或对象给Observable
interval 创建一个间隔执行的Observable
just
range 创建一个整数队列的Observable
repeat_value 重复发送特殊元素或队列元素
start
timer

转换Observables

操作符 描述
buffer
flat_map
group_by
map
scan
window

过滤Observables

操作符 描述
debounce
distinct
element_at
filter
first
ignore_elements
last
sample
skip
skip_last
take
take_last

结合Observables

操作符 描述
combine_latest
join
merge
start_with
switch_latest
zip

错误处理

操作符 描述
catch
retry

实用操作符

操作符 描述
delay
do
materizlize
dematerialize
observe_on
subscribe
subscribe_on
time_interval
timeout
timestamp

条件和布尔操作符

操作符 描述
all
amb
contains
default_if_empty
sequence_equal
skip_until
skip_while
take_until
take_while

数学相关操作符

操作符 描述
average
concat
count
max
min
reduce
sum

可连接Observable操作符

操作符 描述
connect
publish
ref_count
replay

额外的阅读

RxPY源码仓库包含实例notebooks

ReactiveX官方地址和文档:

一些商业内容可以免费获得它们相关联的示例代码

RxPY 3.0.0取消了对backpressure的支持,以下是已知的支持backpress的社区项目

商业资源

O’Reilly Video

O’Reilly has published the video Reactive Python for Data Science which is available on both the O’Reilly Store as well as O’Reilly Safari. This video teaches RxPY from scratch with applications towards data science, but should be helpful for anyone seeking to learn RxPY and reactive programming.

Packt Video

Packt has published the video Reactive Programming in Python, available on Packt store. This video teaches how to write reactive GUI and network applications.

如何在PyQt5中使用scheduler,在非主线程中执行某个操作,然后回到UI线程执行另一个操作。

1
2
3
4
5
rx.from_list([1, 2, 3, 4]).pipe(operators.map(add), operators.subscribe_on(pool_scheduler), operators.observe_on(QtThreadSafeScheduler(QtCore))).subscribe(
on_next=lambda i: print("PROCESS 3: {0} {1}".format(threading.current_thread().name, i)),
on_error=lambda e: print(e),
on_completed=lambda: print("Done!")
)

其中的QtThreadScheduler来自于https://github.com/jcafhe/RxPY

参考