RxJava - 快速指南


RxJava - 概述

RxJava 是 ReactiveX 的基于 Java 的扩展。它提供了 Java 中的实现或 ReactiveX 项目。以下是 RxJava 的主要特征。

  • 扩展观察者模式。

  • 支持数据/事件序列。

  • 提供运算符以声明方式将序列组合在一起。

  • 在内部处理线程、同步、线程安全和并发数据结构。

什么是ReactiveX?

ReactiveX 是一个旨在为各种编程语言提供响应式编程概念的项目。响应式编程是指当数据出现时程序做出反应的场景。它是基于事件的编程概念,事件可以传播到寄存器观察者。

根据Reactive,他们结合了观察者模式、迭代器模式和函数式模式的优点。

观察者模式做得很好。ReactiveX 结合了观察者模式、迭代器模式和函数式编程的最佳思想。

函数式编程

函数式编程围绕使用纯函数构建软件进行。纯函数不依赖于先前的状态,并且对于传递的相同参数始终返回相同的结果。纯函数有助于避免与多线程环境中常见的共享对象、可变数据和副作用相关的问题。

反应式编程

响应式编程是指事件驱动的编程,其中数据流以异步方式进入并在到达时进行处理。

函数式反应式编程

RxJava 同时实现了这两个概念,其中流的数据随着时间的推移而变化,消费者函数也会做出相应的反应。

反应式宣言

反应式宣言是一份在线文档,阐述了应用软件系统的高标准。根据宣言,以下是反应式软件的关键属性 -

  • 响应式- 应始终及时响应。

  • 消息驱动- 应在组件之间使用异步消息传递,以便它们保持松散耦合。

  • 弹性- 即使在高负载下也应保持响应。

  • 弹性- 即使任何组件发生故障,也应保持响应。

RxJava的关键组件

RxJava 有两个关键组件:Observables 和 Observer。

  • Observable - 它表示类似于 Stream 的对象,可以发出零个或多个数据,可以发送错误消息,在发出一组数据时可以控制其速度,可以发送有限和无限数据。

  • 观察者- 它订阅可观察的序列数据并对可观察的每个项目做出反应。每当 Observable 发出数据时,观察者都会收到通知。观察者一一处理数据。

如果项目不存在或者前一个项目没有返回回调,则观察者永远不会收到通知。

RxJava - 环境设置

本地环境设置

RxJava 是 Java 的库,因此第一个要求是在您的计算机中安装 JDK。

系统要求

JDK 1.5或以上。
记忆 没有最低要求。
磁盘空间 没有最低要求。
操作系统 没有最低要求。

第 1 步 - 验证计算机中的 Java 安装

首先,打开控制台并根据您正在使用的操作系统执行java命令。

操作系统 任务 命令
Windows 打开命令控制台 c:\> java -版本
Linux 打开命令终端 $ java -版本
苹果 打开终端 机器:< joseph$ java -版本

让我们验证所有操作系统的输出 -

操作系统 输出
Windows

java版本“1.8.0_101”

Java(TM) SE 运行时环境(版本 1.8.0_101)

Linux

java版本“1.8.0_101”

Java(TM) SE 运行时环境(版本 1.8.0_101)

苹果

java版本“1.8.0_101”

Java(TM) SE 运行时环境(版本 1.8.0_101)

如果您的系统上没有安装 Java,请从以下链接下载 Java 软件开发工具包 (SDK):https://www.oracle.com。我们假设 Java 1.8.0_101 作为本教程的安装版本。

第2步-设置JAVA环境

设置JAVA_HOME环境变量以指向计算机上安装 Java 的基本目录位置。例如。

操作系统 输出
Windows 设置环境变量JAVA_HOME为C:\Program Files\Java\jdk1.8.0_101
Linux 导出 JAVA_HOME = /usr/local/java-current
苹果 导出 JAVA_HOME = /Library/Java/Home

将 Java 编译器位置附加到系统路径。

操作系统 输出
Windows 将字符串C:\Program Files\Java\jdk1.8.0_101\bin添加到系统变量Path的末尾。
Linux 导出路径 = $PATH:$JAVA_HOME/bin/
苹果 不需要

如上所述,使用命令java -version验证 Java 安装。

第 3 步 - 下载 RxJava2 存档

从RxJava @ MVNRepository 及其依赖项Reactive Streams @ MVNRepository下载最新版本的 RxJava jar 文件 。在撰写本教程时,我们已经下载了 rxjava-2.2.4.jar、reactive-streams-1.0.2.jar 并将其复制到 C:\>RxJava 文件夹中。

操作系统 档案名称
Windows rxjava-2.2.4.jar、reactive-streams-1.0.2.jar
Linux rxjava-2.2.4.jar、reactive-streams-1.0.2.jar
苹果 rxjava-2.2.4.jar、reactive-streams-1.0.2.jar

第 4 步 - 设置 RxJava 环境

将RX_JAVA环境变量设置为指向计算机上存储 RxJava jar 的基本目录位置。假设我们已将 rxjava-2.2.4.jar 和reactive-streams-1.0.2.jar 存储在 RxJava 文件夹中。

先生编号 操作系统和描述
1

Windows

将环境变量 RX_JAVA 设置为 C:\RxJava

2

Linux

导出 RX_JAVA = /usr/local/RxJava

3

苹果

导出 RX_JAVA = /Library/RxJava

第 5 步 - 设置 CLASSPATH 变量

设置CLASSPATH环境变量以指向 RxJava jar 位置。

先生编号 操作系统和描述
1

Windows

设置环境变量 CLASSPATH 为 %CLASSPATH%;%RX_JAVA%\rxjava-2.2.4.jar;%RX_JAVA%\reactive-streams-1.0.2.jar;.;

2

Linux

导出 CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:。

3

苹果

导出 CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:。

第 6 步 - 测试 RxJava 设置

创建一个类 TestRx.java 如下所示 -

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!")
         .subscribe(System.out::println);
   }
}

第 7 步 - 验证结果

使用javac编译器编译类,如下所示 -

C:\RxJava>javac Tester.java

验证输出。

Hello World!

RxJava - Observable 的工作原理

Observables代表观察者(订阅者)监听的数据源。简而言之,Observable 发出项目,然后 Subscriber 消费这些项目。

可观察的

  • 一旦订阅者开始监听,Observable 就会提供数据。

  • Observable 可以发出任意数量的项目。

  • Observable 可以只发出完成信号,也可以不发出任何项目。

  • Observable 可以成功终止。

  • Observable 可能永远不会终止。例如,一个按钮可以被点击任意多次。

  • Observable 可能在任何时间点抛出错误。

订户

  • Observable 可以有多个订阅者。

  • 当 Observable 发出一个项目时,每个订阅者的 onNext() 方法都会被调用。

  • 当 Observable 完成发出项目时,每个订阅者的 onComplete() 方法都会被调用。

  • 如果 Observable 发出错误,每个订阅者的 onError() 方法都会被调用。

RxJava - 创建 Observables

以下是创建可观察量的基类。

  • Flowable - 0..N 个流,发出 0 或 n 个项目。支持反应流和背压。

  • 可观察- 0..N 流量,但没有背压。

  • 单一- 1 项或错误。可以被视为方法调用的反应式版本。

  • 可完成- 没有发出任何项目。用作完成或错误的信号。可以被视为 Runnable 的响应式版本。

  • MayBe - 没有发出任何物品或发出 1 个物品。可以被视为可选的响应式版本。

以下是在 Observable 类中创建可观察对象的便捷方法。

  • just(T item) - 返回一个 Observable,它指示给定的(常量引用)项,然后完成。

  • fromIterable(Iterable source) - 将 Iterable 序列转换为发出序列中项目的 ObservableSource。

  • fromArray(T... items) - 将数组转换为发出数组中项目的 ObservableSource。

  • fromCallable(Callable seller) - 返回一个 Observable,当观察者订阅它时,调用您指定的函数,然后发出从该函数返回的值。

  • fromFuture(Future future) - 将 Future 转换为 ObservableSource。

  • Interval(longinitialDelay, long period, TimeUnitunit) - 返回一个 Observable,它在初始延迟后发出 0L,并在其后的每个时间段后发出不断增加的数字。

RxJava - 单个可观察的

Single 类表示单值响应。单个可观察对象只能发出单个成功值或错误。它不会发出 onComplete 事件。

类别声明

以下是io.reactivex.Single<T>类的声明-

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

协议

以下是 Single Observable 运行的顺序协议 -

onSubscribe (onSuccess | onError)?

单个示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      //Create the observable
      Single<String> testSingle = Single.just("Hello World");

      //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

Hello World

RxJava - 也许可观察

MayBe 类代表延迟响应。MayBe observable 可以发出单个成功值或不发出值。

类别声明

以下是io.reactivex.Single<T>类的声明-

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

协议

以下是 MayBe Observable 运行的顺序协议 -

onSubscribe (onSuccess | onError | OnComplete)?

也许是例子

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

Hello World

RxJava - 完整的可观察的

Completable 类代表延迟响应。Completable observable 可以指示成功完成或错误。

类别声明

以下是io.reactivex.Completable类的声明-

public abstract class Completable
extends Object
implements CompletableSource

协议

以下是 Completable Observable 运行的顺序协议 -

onSubscribe (onError | onComplete)?

可完成的例子

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

      //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

Started!
Done!

RxJava - 使用 CompositeDisposable

CompositeDisposable 类表示一个可以容纳多个一次性物品的容器,并提供添加和删除一次性物品的 O(1) 复杂度。

类别声明

以下是io.reactivex.disposables.CompositeDisposable类的声明-

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

复合材料一次性示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer 
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

Hello World
Hi

RxJava - 创建运算符

以下是用于创建 Observable 的运算符。

先生。 运算符及描述
1

创造

从头开始创建一个 Observable 并允许以编程方式调用观察者方法。

2

推迟

在观察者订阅之前不要创建 Observable。为每个观察者创建一个新的可观察值。

3

清空/从不/抛出

创建一个具有有限Behave的 Observable。

4

将对象/数据结构转换为 Observable。

5

间隔

创建一个 Observable,按指定的时间间隔按顺序发射整数。

6

只是

将对象/数据结构转换为 Observable 以发出相同或相同类型的对象。

7

范围

创建一个 Observable,按给定范围的顺序发出整数。

8

重复

创建一个 Observable 并按顺序重复发射整数。

9

开始

创建一个 Observable 来发出函数的返回值。

10

定时器

创建一个 Observable 以在给定的延迟后发出单个项目。

创建算子示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester  {
   public static void main(String[] args) { 
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

ABCDEFG

RxJava - 转换运算符

以下是用于转换从 Observable 发出的项目的运算符。

先生。 运算符及描述
1

缓冲

定期将 Observable 中的项目收集到包中,然后发出包而不是项目。

2

平面地图

用于嵌套可观察量。将项目转换为可观察对象。然后将这些项目扁平化为单个 Observable。

3

通过...分组

将一个 Observable 分成按键组织的一组 Observable,以发出不同的项目组。

4

地图

对每个发出的项目应用一个函数来对其进行转换。

5

扫描

按顺序将函数应用于每个发出的项目,然后发出连续的值。

6

窗户

定期将 Observable 中的项目收集到 Observable 窗口中,然后发出窗口而不是项目。

转换运算符示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester  { 
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

ABCDEFG

RxJava - 过滤运算符

以下是用于有选择地从 Observable 发出项目的运算符。

先生。 运算符及描述
1

去抖动

仅当发生超时时才发出项目,而不发出其他项目。

2

清楚的

只发出独特的物品。

3

元素At

只发出 Observable 发出的 n 索引处的项目。

4

筛选

仅发出那些通过给定谓词函数的项目。

5

第一的

发出第一个项目或通过给定标准的第一个项目。

6

忽略元素

不要从 Observable 发出任何项目,但标记完成。

7

最后的

发出 Observable 中的最后一个元素。

8

样本

以给定的时间间隔发出最新的项目。

9

跳过

跳过 Observable 中的前 n 个项目。

10

跳过最后一个

跳过 Observable 中的最后 n 个项目。

11

从 Observable 中获取前 n 个项目。

12

最后取

从 Observable 中获取最后 n 个项目。

过滤运算符示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

ab

RxJava - 组合运算符

以下是用于从多个 Observable 创建单个 Observable 的运算符。

先生。 运算符及描述
1 和/然后/当

使用模式和计划中介组合项目集。

2 结合最新

通过指定的函数组合每个 Observable 发出的最新项并发出结果项。

3 加入

如果在第二个 Observable 发出的项目的时间范围内发出,则合并两个 Observable 发出的项目。

4 合并

组合 Observables 发出的项目。

5 从...开始

在开始从源 Observable 发出项目之前发出指定的项目序列

6 转变

发出 Observables 发出的最新项目。

7 压缩

根据功能组合 Observables 的项目并发出结果项目。

组合运算符示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
   public static void main(String[] args) {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

g1g2g3g4g5g6

RxJava - 实用操作符

以下是对 Observables 经常有用的运算符。

先生。 运算符及描述
1

延迟

注册操作来处理 Observable 生命周期事件。

2

物化/非物质化

表示发出的项目和发送的通知。

3

观察

指定要观察的调度程序。

4

连载

强制 Observable 进行序列化调用。

5

订阅

对项目和通知的发射进行操作,例如从可观察到的完成

6

订阅

指定 Observable 订阅时要使用的调度程序。

7

时间间隔

转换 Observable 以发出发射之间经过的时间量的指示。

8

暂停

如果指定时间没有发出任何项目,则发出错误通知。

9

时间戳

将时间戳附加到发出的每个项目。

9

使用

创建一次性资源或与 Observable 相同的生命周期。

公用事业运营商示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

abcdefg

RxJava - 条件运算符

以下是评估一个或多个 Observables 或发出的项目的运算符。

先生。 运算符及描述
1

全部

评估发出的所有项目是否满足给定标准。

2

安布

仅在给定多个 Observable 的情况下发出第一个 Observable 中的所有项目。

3

包含

检查 Observable 是否发出特定项目。

4

默认为空

如果 Observable 不发出任何东西,则发出默认项。

5

序列相等

检查两个 Observable 是否发出相同的项目序列。

6

跳至

丢弃第一个 Observable 发出的项目,直到第二个 Observable 发出一个项目。

7

跳过时

丢弃由 Observable 发出的项,直到给定条件变为 false。

8

直到

在第二个 Observable 发出一个项目或终止后,丢弃由一个 Observable 发出的项目。

9

稍事休息

在指定条件变为 false 后丢弃由 Observable 发出的项目。

条件运算符示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")   
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

No Data
a

RxJava - 数学运算符

以下是对 Observable 发出的整个项目进行操作的运算符。

先生。 运算符及描述
1

平均的

评估所有项目的平均值并发出结果。

2

康卡特

不交错地从多个 Observable 发出所有项目。

3

数数

计算所有项目并发出结果。

4

最大限度

评估所有项目中的最大值并发出结果。

5

最小

评估所有项目中的最小值项目并发出结果。

6

减少

对每个项目应用一个函数并返回结果。

7

评估所有项目的总和并发出结果。

数学运算符示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

abcdefg123456

RxJava - 可连接运算符

以下是对订阅有更精确控制的运营商。

先生。 运算符及描述
1

连接

指示可连接的 Observable 向其订阅者发出项目。

2

发布

将 Observable 转换为可连接的 Observable。

3

参考计数

将可连接的 Observable 转换为普通的 Observable。

4

重播

确保每个订阅者都能看到相同的发射项目序列,即使在 Observable 已经开始发射项目并且订阅者稍后订阅之后也是如此。

可连接操作器示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();      
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

0
7
abcdefg

RxJava - 主题

根据Reactive,Subject 既可以充当 Observable 也可以充当 Observer。

主题是一种桥梁或代理,在 ReactiveX 的某些实现中可用,它既充当观察者又充当可观察者。因为它是一个观察者,所以它可以订阅一个或多个Observable,并且因为它是一个Observable,所以它可以通过重新发送它们来传递它所观察到的项目,并且它还可以发送新的项目。

有四种类型的主题 -

先生。 主题和描述
1

发布主题

仅发出订阅时间后发出的那些项目。

2 重播主题

发出由源 Observable 发出的所有项目,无论它何时订阅了 Observable。

3

Behave主体

订阅后,发出最新的项目,然后继续发出源 Observable 发出的项目。

4

异步主题

发射源 Observable 完成发射后发射的最后一项。

RxJava - 发布主题

PublishSubject 向当前订阅的观察者发送项目,并向当前或晚期观察者发送终端事件。

类别声明

以下是io.reactivex.subjects.PublishSubject<T>类的声明-

public final class PublishSubject<T>
extends Subject<T>

发布主题示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      PublishSubject<String> subject = PublishSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd 
      System.out.println(result1);
      //Output will be d only
      //as subscribed after c item emitted.
      System.out.println(result2);
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

abcd
d

RxJava - Behave主题

BehaviorSubject 向每个订阅的观察者发出它观察到的最新项目,然后发送所有后续观察到的项目。

类别声明

以下是io.reactivex.subjects.BehaviorSubject<T>类的声明-

public final class BehaviorSubject<T>
extends Subject<T>

Behave主体示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         
      BehaviorSubject<String> subject =  BehaviorSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();
      //Output will be abcd
      System.out.println(result1);
      //Output will be cd being BehaviorSubject 
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

abcd
cd

RxJava-ReplaySubject

ReplaySubject 向当前和晚期观察者重播事件/项目。

类别声明

以下是io.reactivex.subjects.ReplaySubject<T>类的声明-

public final class ReplaySubject<T>
extends Subject<T>

重播主题示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      ReplaySubject<String> subject = ReplaySubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be abcd being ReplaySubject
      //as ReplaySubject emits all the items
      System.out.println(result2);
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

abcd
abcd

RxJava-AsyncSubject

AsyncSubject 向观察者发出唯一的最后一个值,后跟完成事件或收到的错误。

类别声明

以下是io.reactivex.subjects.AsyncSubject<T>类的声明-

public final class  AsyncSubject<T>
extends Subject<T>

异步主题示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      AsyncSubject<String> subject =  AsyncSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be d being the last item emitted
      System.out.println(result1);
      //Output will be d being the last item emitted     
      System.out.println(result2);
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

d
d

RxJava - 调度程序

调度程序用于多线程环境中与 Observable 运算符一起使用。

根据反应式,调度程序用于调度运算符链如何应用于不同的线程。

默认情况下,Observable 和应用到它的运算符链将在调用其 Subscribe 方法的同一线程上完成其工作,并通知其观察者。SubscribeOn 运算符通过指定 Observable 应在其上运行的不同调度程序来更改此Behave。ObserveOn 运算符指定一个不同的调度程序,Observable 将使用该调度程序向其观察者发送通知。

RxJava 中有以下类型的调度程序 -

先生。 调度程序和描述
1

Schedulers.computation()

创建并返回用于计算工作的调度程序。要调度的线程数取决于系统中存在的 CPU。每个 CPU 允许一个线程。最适合事件循环或回调操作。

2

调度程序.io()

创建并返回用于 IO 绑定工作的调度程序。线程池可以根据需要进行扩展。

3

Schedulers.newThread()

创建并返回一个为每个工作单元创建一个新线程的调度程序。

4

调度程序.trampoline()

创建并返回一个调度程序,该调度程序将当前线程上的工作排队,以便在当前工作完成后执行。

4

Schedulers.from(java.util.concurrent.Executor执行器)

将 Executor 转换为新的 Scheduler 实例。

RxJava - 蹦床调度程序

Schedulers.trampoline() 方法创建并返回一个 Scheduler,该 Scheduler 将当前线程上的工作排队,以便在当前工作完成后执行。

Schedulers.trampoline() 示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

RxJava - 新线程调度程序

Schedulers.newThread() 方法创建并返回一个调度程序,该调度程序为每个工作单元创建一个新线程。

Schedulers.newThread() 示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

RxJava - 计算调度程序

Schedulers.computation() 方法创建并返回用于计算工作的调度程序。要调度的线程数取决于系统中存在的 CPU。每个 CPU 允许一个线程。最适合事件循环或回调操作。

Schedulers.computation() 示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

RxJava - IO 调度程序

Schedulers.io() 方法创建并返回一个用于 IO 绑定工作的调度程序。线程池可以根据需要进行扩展。最适合 I/O 密集型操作。

Schedulers.io() 示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

RxJava - 来自调度程序

Schedulers.from(Executor) 方法将 Executor 转换为新的 Scheduler 实例。

Schedulers.from(Executor) 示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用javac编译器编译该类,如下所示 -

C:\RxJava>javac ObservableTester.java

现在按如下方式运行 ObservableTester -

C:\RxJava>java ObservableTester

它应该产生以下输出 -

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

RxJava - 缓冲

缓冲运算符允许将 Observable 发出的项目收集到列表或包中,并发出这些包而不是项目。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用缓冲,3 个项目将一起发出。

缓冲示例

使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import j