RxPY - 快速指南


RxPY - 概述

本章解释了什么是反应式编程、什么是 RxPY、它的运算符、特性、优点和缺点。

什么是反应式编程?

响应式编程是一种编程范例,处理数据流和变化的传播。这意味着,当一个组件发出数据流时,变化将通过反应式编程库传播到其他组件。变化的传播将持续下去,直到到达最终接收者。

通过使用RxPY,您可以很好地控制异步数据流,例如,可以使用observable跟踪对URL发出的请求,并使用观察者监听请求何时完成以获取响应或错误。

RxPY 使您可以使用Observables处理异步数据流,使用运算符(即过滤器、总和、连接、映射)查询数据流,还可以使用调度程序利用数据流的并发性。创建一个 Observable,为观察者对象提供 on_next(v)、on_error(e) 和 on_completed() 方法,需要订阅该对象,以便我们在事件发生时收到通知。

可观察的

通过使用管道运算符,可以使用链格式的多个运算符来查询 Observable。

RxPY 提供各种类别的运算符,例如:−

  • 数学运算符

  • 转型运营商

  • 过滤运算符

  • 错误处理运算符

  • 公用事业运营商

  • 条件运算符

  • 创建操作符

  • 可连接的操作员

本教程详细解释了这些运算符。

什么是 RxPy?

根据 RxPy 的官方网站( https://rxpy.readthedocs.io/en/latest/),RxPY被定义为一个在 Python 中使用可观察集合和可管道查询运算符来编写异步和基于事件的程序的库

RxPY 是一个支持响应式编程的 python 库。RxPy 代表Python 的响应式扩展。它是一个使用可观察量进行反应式编程的库,可处理异步数据调用、回调和基于事件的程序。

RxPy的特点

在 RxPy 中,以下概念负责处理异步任务 -

可观察的

可观察对象是一种创建观察者并将其附加到具有预期数据流的源的函数,例如推文、计算机相关事件等。

观察者

它是一个具有 on_next()、on_error() 和 on_completed() 方法的对象,当与可观察对象交互时,即源与示例传入的推文等进行交互时,将调用该对象。

订阅

创建可观察量后,要执行可观察量,我们需要订阅它。

运营商

运算符是一个纯函数,它将 observable 作为输入,输出也是一个 observable。您可以通过使用管道运算符对可观察数据使用多个运算符。

主题

主题是一个可观察的序列,也是一个可以多播的观察者,即与许多已订阅的观察者交谈。主题是冷可观察的,即值将在已订阅的观察者之间共享。

调度程序

RxPy 的一项重要特性是并发性,即允许任务并行执行。为了实现这一点,RxPy 有两个运算符 subscribe_on() 和observe_on(),它们与调度程序一起工作,并决定订阅任务的执行。

使用 RxPY 的优点

以下是 RxPy 的优点 -

  • 在处理异步数据流和事件方面,RxPY 是一个很棒的库。RxPY 使用可观察量来进行反应式编程,处理异步数据调用、回调和基于事件的程序。

  • RxPY 提供了大量数学、转换、过滤、实用、条件、错误处理、连接类别等运算符,使反应式编程的使用变得轻松。

  • 并发性,即多个任务一起工作是通过使用 RxPY 中的调度程序来实现的。

  • 使用 RxPY 可以提高性能,因为异步任务的处理和并行处理变得容易。

使用 RxPY 的缺点

  • 使用可观察量调试代码有点困难。

RxPY - 环境设置

在本章中,我们将进行 RxPy 的安装。要开始使用 RxPY,我们需要先安装 Python。因此,我们将开展以下工作 -

  • 安装Python
  • 安装 RxPy

安装Python

访问Python官方网站:https://www.python.org/downloads/。如下所示,然后单击适用于 Windows、Linux/Unix 和 mac os 的最新版本。根据您可用的 64 位或 32 位操作系统下载 Python。

Python

下载后,单击.exe 文件并按照步骤在系统上安装 python。

Python安装

python 包管理器,即 pip 也将通过上述安装默认安装。要使其在您的系统上全局工作,请直接将 python 的位置添加到 PATH 变量中,安装开始时会显示相同的内容,请记住选中“ADD to PATH”复选框。如果您忘记检查它,请按照以下给出的步骤添加到 PATH。

要添加到 PATH,请按照以下步骤操作 -

右键单击计算机图标,然后单击属性 → 高级系统设置。

它将显示如下所示的屏幕 -

系统属性

单击环境变量,如上所示。它将显示如下所示的屏幕 -

环境变量

选择 Path 并单击 Edit 按钮,在末尾添加 python 的位置路径。现在,让我们检查一下 python 版本。

检查 python 版本

E:\pyrx>python --version
Python 3.7.3

安装 RxPY

现在,我们已经安装了 python,我们将安装 RxPy。

一旦安装了 python,python 包管理器,即 pip 也将被安装。以下是检查 pip 版本的命令 -

E:\pyrx>pip --version
pip 19.1.1 from c:\users\xxxx\appdata\local\programs\python\python37\lib\site-
packages\pip (python 3.7)

我们已经安装了pip,版本是19.1.1。现在,我们将使用 pip 安装 RxPy

命令如下 -

pip install rx
点安装 Rx

RxPY - 最新版本更新

在本教程中,我们使用 RxPY 版本 3 和 python 版本 3.7.3。RxPY 版本 3 的工作方式与早期版本(即 RxPY 版本 1)略有不同。

在本章中,我们将讨论这两个版本之间的差异以及更新 Python 和 RxPY 版本时需要完成的更改。

在 RxPY 中可观察到

在 RxPy 版本 1 中,Observable 是一个单独的类 -

from rx import Observable

要使用 Observable,您必须按如下方式使用它 -

Observable.of(1,2,3,4,5,6,7,8,9,10)

在 RxPy 版本 3 中,Observable 直接是 rx 包的一部分。

例子

import rx
rx.of(1,2,3,4,5,6,7,8,9,10)

RxPy 中的运算符

在版本 1 中,运算符是 Observable 类中的方法。例如,要使用运算符,我们必须导入 Observable,如下所示 -

from rx import Observable

运算符用作 Observable.operator,例如,如下所示 -

Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

对于 RxPY 版本 3,运算符是函数,导入和使用如下 -

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

使用 Pipe() 方法链接运算符

在 RxPy 版本 1 中,如果您必须在可观察量上使用多个运算符,则必须按如下方式完成 -

例子

from rx import Observable
Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

但是,在 RxPY 版本 3 的情况下,您可以使用 pipeline() 方法和多个运算符,如下所示 -

例子

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

RxPY - 使用可观察量

observable 是一个创建观察者并将其附加到需要值的源的函数,例如,来自 dom 元素的单击、鼠标事件等。

本章将详细研究下面提到的主题。

  • 创建可观察对象

  • 订阅并执行 Observable

创建可观察量

为了创建一个 observable,我们将使用create()方法并将具有以下项目的函数传递给它。

  • on_next() - 当 Observable 发出一个项目时,该函数被调用。

  • on_completed() - 当 Observable 完成时调用此函数。

  • on_error() - 当 Observable 发生错误时调用此函数。

要使用 create() 方法,首先导入该方法,如下所示 -

from rx import create

这是一个创建可观察对象的工作示例 -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

订阅并执行 Observable

要订阅可观察对象,我们需要使用 subscribe() 函数并传递回调函数 on_next、on_error 和 on_completed。

这是一个工作示例 -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

subscribe() 方法负责执行 observable。回调函数on_nexton_erroron_completed必须传递给 subscribe 方法。调用 subscribe 方法,依次执行 test_observable() 函数。

并不强制将所有三个回调函数传递给 subscribe() 方法。您可以根据您的要求传递 on_next()、on_error() 和 on_completed()。

lambda 函数用于 on_next、on_error 和 on_completed。它将接受参数并执行给定的表达式。

这是创建的可观察值的输出 -

E:\pyrx>python testrx.py
Got - Hello
Job Done!

RxPY - 操作员

本章详细解释了 RxPY 中的运算符。这些运算符包括 -

  • 与运营商合作
  • 数学运算符
  • 转型运营商
  • 过滤运算符
  • 错误处理运算符
  • 公用事业运营商
  • 条件运算符
  • 创建操作符
  • 可连接的操作员
  • 组合运算符

反应式 (Rx) python 几乎有很多运算符,使 python 编码变得轻松。您可以一起使用这多个运算符,例如,在处理字符串时,您可以使用映射、过滤器、合并运算符。

与运营商合作

您可以使用 pipeline() 方法与多个运算符一起使用。此方法允许将多个运算符链接在一起。

这是使用运算符的工作示例 -

test = of(1,2,3) // an observable
subscriber = test.pipe(
   op1(),
   op2(),
   op3()
)

在上面的示例中,我们使用 of() 方法创建了一个 observable,它接受值 1、2 和 3。现在,在这个 observable 上,您可以使用 pipeline() 方法使用任意数量的运算符来执行不同的操作,如下所示多于。运算符的执行将在给定的可观察量上按顺序进行。

要使用运算符,首先将其导入,如下所示 -

from rx import of, operators as op

这是一个工作示例 -

testrx.py

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.filter(lambda s: s%2==0),
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

在上面的示例中,有一个数字列表,我们使用过滤运算符从中过滤偶数,然后使用归约运算符添加它。

输出

E:\pyrx>python testrx.py
Sum of Even numbers is 30

这是我们将要讨论的运算符列表 -

  • 创建可观察对象
  • 数学运算符
  • 转型运营商
  • 过滤运算符
  • 错误处理运算符
  • 公用事业运营商
  • 有条件的
  • 可连接
  • 组合运算符

创建可观察对象

以下是可观察的,我们将在创造类别中讨论

显示示例

可观察的 描述
创造 该方法用于创建可观察对象。
空的 这个 observable 不会输出任何东西,直接发出完整的状态。
绝不 此方法创建一个永远不会达到完整状态的可观察对象。
该方法将创建一个会抛出错误的可观察对象。
从_ 该方法会将给定的数组或对象转换为可观察的。
间隔 该方法将给出超时后产生的一系列值。
只是 该方法会将给定值转换为可观察值。
范围 此方法将根据给定的输入给出一系列整数。
重复值 此方法将创建一个可观察对象,该可观察对象将根据给定的计数重复给定的值。
开始 该方法接受一个函数作为输入并返回一个可观察量,该可观察量将从输入函数返回值。
计时器 该方法将在超时完成后按顺序发出值。

数学运算符

我们将在数学运算符类别中讨论的运算符如下: -

显示示例

操作员 描述
平均的 该运算符将从给定的源可观测值计算平均值,并输出具有平均值的可观测值。
连接 该运算符将接受两个或多个可观察量,并给出一个包含序列中所有值的可观察量。
数数

该运算符接收带有值的 Observable,并将其转换为具有单个值的 Observable。count 函数将谓词函数作为可选参数。

该函数是布尔类型,只有满足条件时才会向输出添加值。

最大限度 该运算符将给出一个具有源可观察量最大值的可观察量。
分钟 该运算符将给出一个具有源可观察值最小值的可观察值。
减少 该运算符接受一个称为 Accumulator 函数的函数,该函数用于来自源可观察值的值,并以可观察值的形式返回累积值,并将可选种子值传递给累加器函数。
该运算符将返回一个可观察量,其中包含源可观察量中所有值的总和。

转型运营商

我们将在转换运算符类别中讨论的运算符如下 -

显示示例

操作员 类别
缓冲 该运算符将从源可观察值中收集所有值,并在满足给定边界条件后定期发出它们。
地面依据 该运算符将根据给定的 key_mapper 函数对来自源可观察值的值进行分组。
地图 该运算符将根据给定的 mapper_func 的输出将源可观察值中的每个值更改为新值。
扫描 该运算符将对来自源可观察值的值应用累加器函数,并返回具有新值的可观察值。

过滤运算符

我们将在过滤运算符类别中讨论的运算符如下 -

显示示例

操作员 类别
去抖 该运算符将给出来自可观察源的值,直到给定的时间跨度并忽略其余时间。
清楚的 该运算符将给出与源可观察值不同的所有值。
元素_at 该运算符将为给定的索引给出来自可观察源的元素。
筛选 该运算符将根据给定的谓词函数过滤源可观察值中的值。
第一的 该运算符将给出可观察源中的第一个元素。
忽略元素 该运算符将忽略源可观察值中的所有值,并且仅执行对完成或错误回调函数的调用。
最后的 该运算符将给出可观察源中的最后一个元素。
跳过 该运算符将返回一个可观察值,该可观察值将跳过作为输入的计数项的第一次出现。
跳过最后一个 该运算符将返回一个可观察值,该可观察值将跳过最后一次出现的作为输入的计数项。
该运算符将根据给定的计数以连续顺序给出源值列表。
最后取 该运算符将根据给定的计数从最后一个连续顺序给出源值列表。

错误处理运算符

我们将在错误处理运算符类别中讨论的运算符是:-

显示示例

操作员 描述
抓住 当出现异常时,该运算符将终止源可观察对象。
重试 当出现错误时,该运算符将重试源可观察对象,并且一旦重试计数完成,它将终止。

公用事业运营商

以下是我们将在效用运算符类别中讨论的运算符。

显示示例

操作员 描述
延迟 该操作员将根据给定的时间或日期延迟源可观测发射。
物化 该运算符会将来自可观察源的值转换为以显式通知值形式发出的值。
时间间隔 该运算符将给出源可观察值之间经过的时间。
暂停 该运算符将在经过时间后给出源可观察值的所有值,否则将触发错误。
时间戳 该运算符会将时间戳附加到源可观察值中的所有值。

条件和布尔运算符

我们将在条件和布尔运算符类别中讨论的运算符如下所示 -

显示示例

操作员 描述
全部 该运算符将检查源可观察值中的所有值是否满足给定的条件。
包含 如果给定值存在并且它是源可观察值的值,则该运算符将返回值为 true 或 false 的可观察值。
如果为空则默认 如果源可观察量为空,则该运算符将返回默认值。
序列等于 该运算符将比较两个可观察值序列或值数组,并返回值为 true 或 false 的可观察值。
跳过直到 该运算符将丢弃源可观察量中的值,直到第二个可观察量发出值。
跳过同时 该运算符将返回一个可观察量,其中包含满足传递条件的源可观察量的值。
直到 在第二个 observable 发出值或终止后,该运算符将丢弃源 observable 中的值。
需要一段时间 当条件失败时,该运算符将丢弃源可观察值中的值。

可连接的运营商

我们将在可连接操作符类别中讨论的操作符是 -

显示示例

操作员 描述
发布 该方法会将可观察量转换为可连接的可观察量。
参考计数 该运算符将使可观察量成为普通可观察量。
重播 此方法的工作原理与 replaySubject 类似。即使可观察对象已经发出并且某些订阅者订阅较晚,此方法也将返回相同的值。

组合运算符

以下是我们将在组合运算符类别中讨论的运算符。

显示示例

操作员 描述
组合最新 该运算符将为作为输入给出的可观察值创建一个元组。
合并 该运算符将合并给定的可观察量。
从...开始 该运算符将接受给定值并在源可观察值的开头添加返回完整序列。
压缩 该运算符返回一个可观察量,其值采用元组形式,该元组形式是通过获取给定可观察量的第一个值等形成的。

RxPY - 与主题一起工作

主题是一个可观察的序列,也是一个可以多播的观察者,即与许多已订阅的观察者交谈。

我们将讨论以下主题 -

  • 创建主题
  • 订阅主题
  • 将数据传递给主体
  • Behave主体
  • 重播主题
  • 异步主题

创建主题

要使用主题,我们需要导入主题,如下所示 -

from rx.subject import Subject

您可以按如下方式创建主客体 -

subject_test = Subject()

该对象是一个具有三种方法的观察者 -

  • on_next(值)
  • on_error(错误)和
  • on_completed()

订阅主题

您可以对该主题创建多个订阅,如下所示 -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

将数据传递给主体

您可以将数据传递给使用 on_next(value) 方法创建的主题,如下所示 -

subject_test.on_next("A")
subject_test.on_next("B")

数据将传递给所有订阅者,添加到主题上。

这是该主题的一个工作示例。

例子

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

subject_test 对象是通过调用Subject() 创建的。subject_test 对象引用了 on_next(value)、on_error(error) 和 on_completed() 方法。上述示例的输出如下所示 -

输出

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

我们可以使用 on_completed() 方法来停止主题执行,如下所示。

例子

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

一旦我们调用complete,后面调用的下一个方法就不会被调用。

输出

E:\pyrx>python testrx.py
The value is A
The value is A

现在让我们看看如何调用 on_error(error) 方法。

例子

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

输出

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

Behave主体

BehaviourSubject 会在调用时为您提供最新值。您可以创建Behave主体,如下所示 -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

这是一个使用Behave主题的工作示例

例子

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

输出

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

重播主题

重放主体类似于Behave主体,其中,它可以缓冲值并将其重放给新订阅者。这是重播主题的工作示例。

例子

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

重放主题上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于呼叫的新订阅者。

输出

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

异步主题

对于AsyncSubject,最后调用的值将传递给订阅者,并且只有在调用complete() 方法后才会完成。

例子

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

输出

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2

RxPY - 使用调度程序的并发

RxPy 的一项重要特性是并发性,即允许任务并行执行。为了实现这一点,我们有两个运算符 subscribe_on() 和observe_on(),它们将与调度程序一起工作,这将决定订阅任务的执行。

这是一个工作示例,显示了对 subscibe_on()、observe_on() 和调度程序的需求。

例子

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
) 
input("Press any key to exit\n")

在上面的例子中,我有2个任务:任务1和任务2。任务的执行是按顺序的。仅当第一个任务完成后,第二个任务才开始。

输出

E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete

RxPy支持许多Scheduler,在这里,我们将使用ThreadPoolScheduler。ThreadPoolScheduler 主要会尝试管理可用的 CPU 线程。

在我们之前看到的示例中,我们将使用一个多处理模块来提供 cpu_count。该计数将提供给 ThreadPoolScheduler,它将根据可用线程设法使任务并行工作。

这是一个工作示例 -

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

在上面的示例中,我有 2 个任务,cpu_count 为 4。由于任务为 2,可用线程为 4,因此两个任务都可以并行启动。

输出

E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete

如果您看到输出,则两个任务已并行启动。

现在,考虑一个场景,其中任务超过 CPU 数量,即 CPU 数量为 4,任务为 5。在这种情况下,我们需要检查任务完成后是否有线程获得空闲,这样,就可以分配给队列中可用的新任务。

为此,我们可以使用observe_on() 运算符,它将观察调度程序是否有空闲线程。这是一个使用observe_on()的工作示例

例子

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

输出

E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete

如果您看到输出,则任务 4 完成后,线程将被分配给下一个任务,即任务 5,并且任务 5 开始执行。

RxPY - 示例

在本章中,我们将详细讨论以下主题 -

  • 基本示例展示了 observable、操作符和订阅观察者的工作原理。
  • 可观察与主体之间的差异。
  • 了解冷和热可观测值。

下面给出了一个基本示例,展示了可观察对象、运算符和订阅观察者的工作原理。

例子

测试.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

这是一个非常简单的例子,其中,我从这个 URL 获取用户数据 -

https://jsonplaceholder.typicode.com/users。

过滤数据,给出以“C”开头的名称,然后使用地图仅返回名称。这是相同的输出 -

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

可观察与主体之间的差异

在这个例子中,我们将看到可观察对象和主题之间的区别。

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

输出

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

在上面的例子中,每次你订阅 observable 时,它​​都会给你新的值。

主题示例

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

输出

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

如果您看到值在使用该主题的两个订阅者之间共享。

了解冷热观测值

可观察量被分类为

  • 冷观测
  • 热门观测值

当多个订阅者订阅时,可观察量的差异将会被注意到。

冷观测

冷可观察量是执行的可观察量,并在每次订阅时呈现数据。当它被订阅时,可观察对象被执行并给出新的值。

下面的例子给出了冷可观察的理解。

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

输出

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

在上面的示例中,每次订阅 observable 时,它​​都会执行 observable 并发出值。如上例所示,不同订阅者的值也可能不同。

热门观测值

在热可观察的情况下,它们会在准备好时发出值,并且不会总是等待订阅。当值被发出时,所有订阅者将获得相同的值。

当您希望在 observable 准备就绪时发出值,或者您希望向所有订阅者共享相同的值时,您可以使用热 observable。

热可观察的一个例子是主题和可连接运算符。

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

输出

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

如果您看到,订阅者之间共享相同的值。您可以使用publish()连接可观察运算符来实现相同的目的。