使用RxJS来构建响应式应用

简介

RxJS is the Reactive-Extensions(ReactiveX) for JS

ReactiveX 由微软发起并推动,是一个为用于实现异步和基于事件编程的程序库制定的一个标准,RxJS 是对这个标准的 Javascript 实现。

RP(Reactive Programming)响应式编程的思路是把随时间不断变化的数据、状态、事件等等转成可被观察的序列(Observable Sequence),然后订阅序列中那些Observable对象的变化,一旦变化,就会执行事先安排好的各种转换和操作。


ReactiveX


OC 实现的 RAC(ReactiveCocoa) 与 RxJS 执行了两个不同的标准,RAC 遵循的 RP 标准与 Rx 在概念上有一些不同(特别体现在冷热信号上)。


RxJS 提供了什么

  1. 事件管理、调度引擎
  2. 十分丰富的操作符
  3. 流式的数据处理方法
  4. 声明式的编程风格

重要的概念


Observable (信号源)

Observable

emit 向流(stream)上发射对象(信号)


Observer(Subscriber, reactor,iterator)

Observer 在信号流中是一个观察者(哨兵)的角色,它负责观察任务执行的状态并向流中发射信号

onNext

向流中发射一个正常的信号。

onError

向流中发射一个错误信号,并终止 Observable 的信号发射。

onCompleted

向流中发射一个完成信号,并终止 Observable 的信号发射。


1
2
3
4
5
6
7
8
9
10
11
12
let responseStream = Rx.Observable.create((observer) => {
fetch(reqUrl).then((data) => {
observer.onNext(data);
observer.onCompleted('completed');
}, (err) => {
observer.onError(err);
})
responseStream.subscribe((data)=>{...},
(err)=>{...},
(completed)=>{...})



Operators (操作符)

链式的操作符构成了一个容纳信号(数据)流的管道,操作会按照操作符的排列顺序有序执行,RxJS 提供了60多个内置操作符,足以应付数据流处理的绝大部分需求。更为灵活的是,我们还可以在 RxJS 上自定义操作符。


  1. create 创建 Observable
    立即创建一个 Observable 对象或者在订阅时才创建 Observable 对象(Lazy load)
    create
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建一个 Observable
Rx.Observable.create((observer)=>{observer.onNext()});

// 创建一个空的 Observable
Rx.Observable.empty();

// 从 Interval 创建
Rx.Observable.interval(100);

// 从 promise 创建
Rx.Observable.fromPromise(fetch(url));

// 从数组创建
Rx.Observable.fromArray(array)

  1. subscribe/connect Observable
    只有当 Observable 被订阅或者 connect 后,Observable 才会发射信号。
1
2
3
4
5
6
7
8
9
10
11
12
// 订阅事件
let source = Rx.Observable.interval(100);
source.subscribe((event)=>{
// do something
})

// connect 事件
let source = Rx.Observable.interval(100).publish();
source.connect().subscribe((event)=>{
// do something
})


  1. transform 变换 Observable 的形式
    eg: map, pluck, flatmap, publish
1
2
3
4
5
6
7
8
9
10
11
12
13
var source = Rx.Observable
.from([
{ value: {index: 0} },
{ value: {index: 1} },
{ value: {index: 2} }
])
.pluck('value')
.map(value=>{
return value.index;
})
.subscribe(index=>{
console.log(index) // Output 0,1,2
})

  1. filter 对 Observable 发射出的对象进行过滤并将对象向管道的后方传递
    filter
    eg: debounce, filter, first,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// filter
var source = Rx.Observable
.from([
{ value: 0 },
{ value: 1 },
{ value: 2 }
])
.filter(data=>{
return data.value > 1;
})
.subscribe(obj=>{
console.log(obj) // Output { value: 2 }
})



  1. combine 将多个 Observable 组合成一个 Observalbe
    Combine
    eg: merge, combineLatest, join, switch

多个请求并发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

let responseStream1 = Rx.Observable.fromPromise(fetch(reqUrl1));
let responseStream2 = Rx.Observable.fromPromise(fetch(reqUrl2));
Rx.Observable
.combineLatest(responseStream1, responseStream2)
.subscribe((tuple) => {
let data1 = tuple[0];
let data2 = tuple[1];
}, (err) => {... }, (completed) => {})

// or
Rx.Observable
.combineLatest(responseStream1, responseStream2, (data1, data2) => {
return processData(data1, data2);
})
.subscribe((result) => {
//do something
}, (err) => {... }, (completed) => {})

  1. utilities and the other 工具运算符以及其他运算符

参见 ReactiveX 核心操作符


Scheduler (调度器)

可以单独调度某个操作以实现更复杂的异步编程,可以实现让某个操作运行在特定的一个事件循环中。

参见 Scheduler


Subject

Alt text
它是一个代理对象,既是一个 Observable 又是一个 Observer,它可以同时接受 Observable 发射出的数据,也可以向订阅了它的 observer 发射数据。ReaciveX 标准中有四种 Subject

参见 Subject


Cold/Hot/Connectable Observable


Cold Observable:

这种 Observable 创建之后并不会马上开始向事件流中发射对象,只有当它的 subscribe 方法被调用之后,它才会开始向外发送信号,


以下代码创建了一个 Cold Observable,这时即使 keyup 事件被触发,这个 Observable 对象也不会发出任何信号:

1
2
3
// inputValueStream is a cold Observable
let inputValueStream = Rx.Observable.fromEvent(input, 'keyup')
.pluck('target', 'value');

在调用 subscribe 方法之后这个 Observable 才会被真正触发,这时触发 keyup 事件,input 中的 value 将会被发射到流中进而被后续的操作符处理。

1
2
3
4
inputValueStream
.subscribe(data => text.textContent = data,
err=>{},
complete=>{});

Rx 规范中通过创建 Observable 操作符创建出来的对象都是冷信号。


Hot/Connectable Observable

和 Cold Observable 相反,Hot Observable 在创建出来的那一刻就已经开始向流中发射信号,而 Connectable Observable 在创建时候并不会发射信号,直到 connect 方法被调用。


在 RxJS 中创建这两种 Observable 的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Hot and Connectable Observable
let input5 = document.querySelector('#i5');
let p5 = document.querySelector('#p5');
let input5Stream = Rx.Observable.fromEvent(input5, 'keyup')
.pluck('target', 'value')
.map(data => {
console.log(data);
return data;
})
.publish();//调用 publish 之后 input5Stream 是一个 Connectable Observable
input5Stream.connect();// 在调用 connect 方法之后,input5Stream 成为了 Hot Observable
window.setTimeout(() => {
input5Stream.subscribe(data => {
p5.textContent = data
});
}, 3000);

优点


  1. 可以多次发射信号

  1. 信号流可以被取消
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//Cancel a RxJS Observable
const subscription = Rx.Observable.create((observer) => {
let counter = 0;
setInterval(() => {
counter++;
if (counter > 10) {
observer.onCompleted();
} else {
observer.onNext(counter);//can be called many times
}
}, 500);
}).subscribe(counter => p2.textContent = counter,
(err) => {},
(completed) => {})

Rx.Observable.fromEvent(stopBtn, 'click')
.subscribe((event) => {
// cancel Observable
subscription.dispose();// AutoDetachObserver->AbstractObserver->Observer
})

  1. 提供了极其丰富的操作符来对异步数据流进行操作
  2. 可以精确控制事件的进行过程
  • 可以在需要的时候才触发过程的执行和信号的发射
  • 可以在执行失败的时候重试等等
  1. 有序的流式操作符合人类的思维模式
  2. 提供了很多类似数组方法(Array Extras)的操作符,类比数组的方式对信号(数据)流进行处理。
  3. 操作符具有比较清晰的语义(相比 Promise 的 then)
  4. 无任何依赖,可以和多种框架良好的配合

不足

  1. 内容和概念较多,学习成本较高
  2. 链式操作很长,代码提示不好(Rx 很适合在强类型语境下使用)
  3. 需要引入额外的库 (相比原生 Promise)

与 Promise对比的例子:

网络请求的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// example promise net work
let requestPromise = function() {
return new Promise((resolve, reject) => {
requestUrl(url).then(data => resolve(data), err => reject(failed));
})
}
requestPromise().then(data => {
let temp = data.ret_data;
// 过滤数据
if (temp && temp.length) {
//处理数据
let result = processRtnData(temp);
displayData(result);
}
}, err => {
console.log(err);
})


Promise 实现
更关注控制流
描述怎么做的命令式编程风格


1
2
3
4
5
// example rxjs net work
Rx.Observable.fromPromise(requestUrl(url))
.filter(data => data.ret_data && data.ret_data.length > 0)
.map(data => processRtnData(data.ret_data))
.subscribe(data => displayData(data));

RxJS 实现
更关注数据本身的流动,且流动是有序的
描述做什么的声明式代码
简洁直观,符合人的自然思维


应用场景


网络请求

  1. 多个网络请求并发
  2. 取消对进行中的网络请求
  3. 实时搜索的节流功能

交互

  1. 允许在一
    个过程中多次发出信号(密码框)
  2. 将 UI 事件直接转换为信号进行流式处理

Angular2 中 Rx.js 的应用;

Angular2 在框架级别集成了 RxJS

  1. facade 模块中的 async.js
    |EventEmitter 用于为 components 提供自定义事件的支持
  2. forms 模块
  3. Http 模块

参考:
ReactiveX
RxJS
ReactiveX 核心操作符
Promise vs Observable
Subject详解
Scheduler