简介
RxJS is the Reactive-Extensions(ReactiveX) for JS
ReactiveX 由微软发起并推动,是一个为用于实现异步和基于事件编程的程序库制定的一个标准,RxJS 是对这个标准的 Javascript 实现。
RP(Reactive Programming)响应式编程的思路是把随时间不断变化的数据、状态、事件等等转成可被观察的序列(Observable Sequence),然后订阅序列中那些Observable对象的变化,一旦变化,就会执行事先安排好的各种转换和操作。
OC 实现的 RAC(ReactiveCocoa) 与 RxJS 执行了两个不同的标准,RAC 遵循的 RP 标准与 Rx 在概念上有一些不同(特别体现在冷热信号上)。
RxJS 提供了什么
- 事件管理、调度引擎
- 十分丰富的操作符
- 流式的数据处理方法
- 声明式的编程风格
重要的概念
Observable (信号源)
emit 向流(stream)上发射对象(信号)
Observer(Subscriber, reactor,iterator)
Observer 在信号流中是一个观察者(哨兵)的角色,它负责观察任务执行的状态并向流中发射信号
onNext
向流中发射一个正常的信号。
onError
向流中发射一个错误信号,并终止 Observable 的信号发射。
onCompleted
向流中发射一个完成信号,并终止 Observable 的信号发射。
1 2 3 4 5 6 7 8 9 10 11 12
| let responseStream = Rx.Observable.create((observer) => { fetch(reqUrl).then((data) => { observer.onNext(data); observer.onCompleted('completed'); }, (err) => { observer.onError(err); }) responseStream.subscribe((data)=>{...}, (err)=>{...}, (completed)=>{...})
|
Operators (操作符)
链式的操作符构成了一个容纳信号(数据)流的管道,操作会按照操作符的排列顺序有序执行,RxJS 提供了60多个内置操作符,足以应付数据流处理的绝大部分需求。更为灵活的是,我们还可以在 RxJS 上自定义操作符。
- create 创建 Observable
立即创建一个 Observable 对象或者在订阅时才创建 Observable 对象(Lazy load)
create
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| // 创建一个 Observable Rx.Observable.create((observer)=>{observer.onNext()});
// 创建一个空的 Observable Rx.Observable.empty();
// 从 Interval 创建 Rx.Observable.interval(100);
// 从 promise 创建 Rx.Observable.fromPromise(fetch(url));
// 从数组创建 Rx.Observable.fromArray(array)
|
- subscribe/connect Observable
只有当 Observable 被订阅或者 connect 后,Observable 才会发射信号。
1 2 3 4 5 6 7 8 9 10 11 12
| // 订阅事件 let source = Rx.Observable.interval(100); source.subscribe((event)=>{ // do something })
// connect 事件 let source = Rx.Observable.interval(100).publish(); source.connect().subscribe((event)=>{ // do something })
|
- transform 变换 Observable 的形式
eg: map, pluck, flatmap, publish
1 2 3 4 5 6 7 8 9 10 11 12 13
| var source = Rx.Observable .from([ { value: {index: 0} }, { value: {index: 1} }, { value: {index: 2} } ]) .pluck('value') .map(value=>{ return value.index; }) .subscribe(index=>{ console.log(index) // Output 0,1,2 })
|
- filter 对 Observable 发射出的对象进行过滤并将对象向管道的后方传递
eg: debounce, filter, first,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| // filter var source = Rx.Observable .from([ { value: 0 }, { value: 1 }, { value: 2 } ]) .filter(data=>{ return data.value > 1; }) .subscribe(obj=>{ console.log(obj) // Output { value: 2 } })
|
- combine 将多个 Observable 组合成一个 Observalbe
eg: merge, combineLatest, join, switch
多个请求并发:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| let responseStream1 = Rx.Observable.fromPromise(fetch(reqUrl1)); let responseStream2 = Rx.Observable.fromPromise(fetch(reqUrl2)); Rx.Observable .combineLatest(responseStream1, responseStream2) .subscribe((tuple) => { let data1 = tuple[0]; let data2 = tuple[1]; }, (err) => {... }, (completed) => {})
// or Rx.Observable .combineLatest(responseStream1, responseStream2, (data1, data2) => { return processData(data1, data2); }) .subscribe((result) => { //do something }, (err) => {... }, (completed) => {})
|
- utilities and the other 工具运算符以及其他运算符
参见 ReactiveX 核心操作符
Scheduler (调度器)
可以单独调度某个操作以实现更复杂的异步编程,可以实现让某个操作运行在特定的一个事件循环中。
参见 Scheduler
Subject
它是一个代理对象,既是一个 Observable 又是一个 Observer,它可以同时接受 Observable 发射出的数据,也可以向订阅了它的 observer 发射数据。ReaciveX 标准中有四种 Subject
参见 Subject
Cold/Hot/Connectable Observable
Cold Observable:
这种 Observable 创建之后并不会马上开始向事件流中发射对象,只有当它的 subscribe 方法被调用之后,它才会开始向外发送信号,
以下代码创建了一个 Cold Observable,这时即使 keyup
事件被触发,这个 Observable 对象也不会发出任何信号:
1 2 3
| // inputValueStream is a cold Observable let inputValueStream = Rx.Observable.fromEvent(input, 'keyup') .pluck('target', 'value');
|
在调用 subscribe 方法之后这个 Observable 才会被真正触发,这时触发 keyup 事件,input 中的 value 将会被发射到流中进而被后续的操作符处理。
1 2 3 4
| inputValueStream .subscribe(data => text.textContent = data, err=>{}, complete=>{});
|
Rx 规范中通过创建 Observable 操作符创建出来的对象都是冷信号。
Hot/Connectable Observable
和 Cold Observable 相反,Hot Observable 在创建出来的那一刻就已经开始向流中发射信号,而 Connectable Observable 在创建时候并不会发射信号,直到 connect
方法被调用。
在 RxJS 中创建这两种 Observable 的方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| // Hot and Connectable Observable let input5 = document.querySelector('#i5'); let p5 = document.querySelector('#p5'); let input5Stream = Rx.Observable.fromEvent(input5, 'keyup') .pluck('target', 'value') .map(data => { console.log(data); return data; }) .publish();//调用 publish 之后 input5Stream 是一个 Connectable Observable input5Stream.connect();// 在调用 connect 方法之后,input5Stream 成为了 Hot Observable window.setTimeout(() => { input5Stream.subscribe(data => { p5.textContent = data }); }, 3000);
|
优点
- 可以多次发射信号
- 信号流可以被取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| //Cancel a RxJS Observable const subscription = Rx.Observable.create((observer) => { let counter = 0; setInterval(() => { counter++; if (counter > 10) { observer.onCompleted(); } else { observer.onNext(counter);//can be called many times } }, 500); }).subscribe(counter => p2.textContent = counter, (err) => {}, (completed) => {})
Rx.Observable.fromEvent(stopBtn, 'click') .subscribe((event) => { // cancel Observable subscription.dispose();// AutoDetachObserver->AbstractObserver->Observer })
|
- 提供了极其丰富的操作符来对异步数据流进行操作
- 可以精确控制事件的进行过程
- 可以在需要的时候才触发过程的执行和信号的发射
- 可以在执行失败的时候重试等等
- 有序的流式操作符合人类的思维模式
- 提供了很多类似数组方法(Array Extras)的操作符,类比数组的方式对信号(数据)流进行处理。
- 操作符具有比较清晰的语义(相比 Promise 的 then)
- 无任何依赖,可以和多种框架良好的配合
不足
- 内容和概念较多,学习成本较高
- 链式操作很长,代码提示不好(Rx 很适合在强类型语境下使用)
- 需要引入额外的库 (相比原生 Promise)
与 Promise对比的例子:
网络请求的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| // example promise net work let requestPromise = function() { return new Promise((resolve, reject) => { requestUrl(url).then(data => resolve(data), err => reject(failed)); }) } requestPromise().then(data => { let temp = data.ret_data; // 过滤数据 if (temp && temp.length) { //处理数据 let result = processRtnData(temp); displayData(result); } }, err => { console.log(err); })
|
Promise 实现
更关注控制流
描述怎么做的命令式编程风格
1 2 3 4 5
| // example rxjs net work Rx.Observable.fromPromise(requestUrl(url)) .filter(data => data.ret_data && data.ret_data.length > 0) .map(data => processRtnData(data.ret_data)) .subscribe(data => displayData(data));
|
RxJS 实现
更关注数据本身的流动,且流动是有序的
描述做什么的声明式代码
简洁直观,符合人的自然思维
应用场景
网络请求
- 多个网络请求并发
- 取消对进行中的网络请求
- 实时搜索的节流功能
交互
- 允许在一
个过程中多次发出信号(密码框)
- 将 UI 事件直接转换为信号进行流式处理
Angular2 中 Rx.js 的应用;
Angular2 在框架级别集成了 RxJS
- facade 模块中的 async.js
|EventEmitter 用于为 components 提供自定义事件的支持
- forms 模块
- Http 模块
参考:
ReactiveX
RxJS
ReactiveX 核心操作符
Promise vs Observable
Subject详解
Scheduler