RxJS - 处理主题


主题是一个可以多播的可观察对象,即与许多观察者交谈。考虑一个带有事件监听器的按钮,每次用户单击按钮时,都会调用使用添加监听器附加到事件的函数,类似的功能也适用于主题。

我们将在本章中讨论以下主题 -

  • 创建主题
  • 可观察和主题有什么区别?
  • Behave主体
  • 重播主题
  • 异步主题

创建主题

要使用主题,我们需要导入主题,如下所示 -

import { Subject } from 'rxjs';

您可以按如下方式创建主题对象 -

const subject_test = new Subject();

该对象是一个具有三种方法的观察者 -

  • 下一个(五)
  • 错误(e)
  • 完全的()

订阅主题

您可以对该主题创建多个订阅,如下所示 -

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});

订阅被注册到主题对象,就像我们之前讨论的 addlistener 一样。

将数据传递给主体

您可以将数据传递给使用 next() 方法创建的主题。

subject_test.next("A");

数据将传递到该主题上添加的所有订阅。

例子

这是该主题的一个工作示例 -

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.next("B");

subject_test 对象是通过调用 new subject() 创建的。subject_test 对象引用了 next()、error() 和complete() 方法。上述示例的输出如下所示 -

输出

传递数据

我们可以使用complete()方法来停止主题执行,如下所示。

例子

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.complete();
subject_test.next("B");

一旦我们调用完成,后面调用的下一个方法就不会被调用。

输出

传递数据方法

现在让我们看看如何调用error()方法。

例子

下面是一个工作示例 -

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.error(new Error("There is an error"));

输出

传递数据错误

可观察和主题有什么区别?

可观察者将与订阅者进行一对一的交谈。每当您订阅可观察对象时,执行都会从头开始。使用 ajax 进行 Http 调用,并有 2 个订阅者调用 observable。您将在浏览器网络选项卡中看到 2 个 HttpHttp 请求。

例子

这是一个相同的工作示例 -

import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber1 = final_val.subscribe(a => console.log(a));
let subscriber2 = final_val.subscribe(a => console.log(a));

输出

可观察的

可观察的Ex

现在,问题是,我们希望共享相同的数据,但不共享,代价是 2 个 Http 调用。我们希望进行一次 Http 调用并在订阅者之间共享数据。

使用主题可以实现这一点。它是一个可以多播的可观察对象,即与许多观察者交谈。它可以在订阅者之间分享价值。

例子

这是一个使用主题的工作示例 -

import { Subject } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(v)
});
subject_test.subscribe({
   next: (v) => console.log(v)
});

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber = final_val.subscribe(subject_test);

输出

可观察到的可能

现在您只能看到一个 Http 调用,并且相同的数据在调用的订阅者之间共享。

可观察的订阅者

Behave主体

Behave主体在调用时会给你最新的值。

您可以创建Behave主体,如下所示 -

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject("Testing Behaviour Subject"); 
// initialized the behaviour subject with value:Testing Behaviour Subject

例子

这是一个使用Behave主题的工作示例 -

import { BehaviorSubject } from 'rxjs';
const behavior_subject = new BehaviorSubject("Testing Behaviour Subject"); 
// 0 is the initial value

behavior_subject.subscribe({
   next: (v) => console.log(`observerA: ${v}`)
});

behavior_subject.next("Hello");
behavior_subject.subscribe({
   next: (v) => console.log(`observerB: ${v}`)
});
behavior_subject.next("Last call to Behaviour Subject");

输出

Behave主体

重播主题

重放主体类似于Behave主体,其中,它可以缓冲值并将其重放给新订阅者。

例子

这是重播主题的一个工作示例 -

import { ReplaySubject } from 'rxjs';
const replay_subject = new ReplaySubject(2); 
// buffer 2 values but new subscribers

replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject A: ${v}`)
});

replay_subject.next(1);
replay_subject.next(2);
replay_subject.next(3);
replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject B: ${v}`)
});

replay_subject.next(5);

重放主题上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于呼叫的新订阅者。

输出

重播主题

异步主题

在 AsyncSubject 的情况下,最后调用的值将传递给订阅者,并且仅在调用complete()方法后才会完成。

例子

这是一个相同的工作示例 -

import { AsyncSubject } from 'rxjs';

const async_subject = new AsyncSubject();

async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject A: ${v}`)
});

async_subject.next(1);
async_subject.next(2);
async_subject.complete();
async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject B: ${v}`)
});

这里,在调用完成之前,传递给主题的最后一个值是 2,并且与传递给订阅者的值相同。

输出

异步主题