RxJS 是一個(gè)響應(yīng)式的庫(kù),它接收從事件源發(fā)出的一個(gè)個(gè)事件,經(jīng)過(guò)處理管道的層層處理之后,傳入最終的接收者,這個(gè)處理管道是由操作符組成的,開(kāi)發(fā)者只需要選擇和組合操作符就能完成各種異步邏輯,極大簡(jiǎn)化了異步編程。除此以外,RxJS 的設(shè)計(jì)還遵循了函數(shù)式、流的理念。
直接講概念比較難理解,不如我們實(shí)現(xiàn)一個(gè)簡(jiǎn)易的 RxJS 再來(lái)看這些。
RxJS 的使用
RxJS 會(huì)對(duì)事件源做一層封裝,叫做 Observable,由它發(fā)出一個(gè)個(gè)事件。
比如這樣:
const source = new Observable((observer) => { let i = 0; setInterval(() => { observer.next(++i); }, 1000); });
在回調(diào)函數(shù)里面設(shè)置一個(gè)定時(shí)器,不斷通過(guò) next 傳入事件。
這些事件會(huì)被接受者監(jiān)聽(tīng),叫做 Observer。
const subscription = source.subscribe({ next: (v) => console.log(v), error: (err) => console.error(err), complete: () => console.log('complete'), });
observer 可以接收 next 傳過(guò)來(lái)的事件,傳輸過(guò)程中可能有 error,也可以在這里處理 error,還可以處理傳輸完成的事件。
這樣的一個(gè)監(jiān)聽(tīng)或者說(shuō)訂閱,叫做 Subscription。
可以訂閱當(dāng)然也可以取消訂閱:
subscription.unsubscribe();
取消訂閱時(shí)的回調(diào)函數(shù)是在 Observable 里返回的:
const source = new Observable((observer) => { let i = 0; const timer = setInterval(() => { observer.next(++i); }, 1000); return function unsubscribe() { clearInterval(timer); }; });
發(fā)送事件、監(jiān)聽(tīng)事件只是基礎(chǔ),處理事件的過(guò)程才是 RxJS 的精髓,它設(shè)計(jì)了管道的概念,可以用操作符 operator 來(lái)組裝這個(gè)管道:
source.pipe( map((i) => ++i), map((i) => i * 10) ).subscribe(() => { //... })
事件經(jīng)過(guò)管道之后才會(huì)傳到 Observer,在傳輸過(guò)程中會(huì)經(jīng)過(guò)一個(gè)個(gè)操作符的處理。
比如這里的處理邏輯是,對(duì)傳過(guò)來(lái)的數(shù)據(jù)加 1,然后再乘以 10。
綜上,使用 RxJS 的代碼就是這樣的:
const source = new Observable((observer) => { let i = 0; const timer = setInterval(() => { observer.next(++i); }, 1000); return function unsubscribe() { clearInterval(timer); }; }); const subscription = source.pipe( map((i) => ++i), map((i) => i * 10) ).subscribe({ next: (v) => console.log(v), error: (err) => console.error(err), complete: () => console.log('complete'), }); setTimeout(() => { subscription.unsubscribe(); }, 4500);
我們通過(guò) Observable 創(chuàng)建了一個(gè)事件源,每秒發(fā)出一個(gè)事件,這些事件會(huì)經(jīng)過(guò)管道的處理再傳遞給 Observer,管道的組成是兩個(gè) map 操作符,對(duì)數(shù)據(jù)做了 + 1 和 * 10 的處理。
Observer 接收到傳遞過(guò)來(lái)的數(shù)據(jù),做了打印,還對(duì)錯(cuò)誤和結(jié)束時(shí)的事件做了處理。此外,Observable 提供了取消訂閱時(shí)的處理邏輯,當(dāng)我們?cè)?4.5s 取消訂閱時(shí),就可以清除定時(shí)器。
使用 RxJS 基本就是這個(gè)流程,那它是怎么實(shí)現(xiàn)的呢?
80 行代碼實(shí)現(xiàn)
RxJS先從事件源開(kāi)始,實(shí)現(xiàn) Observable:
觀察下它的特點(diǎn):
- 它接收一個(gè)回調(diào)函數(shù),里面可以調(diào)用 next 來(lái)傳輸數(shù)據(jù)。
- 它有 subscribe 方法可以用來(lái)添加 Observer 的訂閱,返回 subscription
- 它可以在回調(diào)函數(shù)里返回 unsbscribe 時(shí)的處理邏輯
- 它有 pipe 方法可以傳入操作符
我們按照這些特點(diǎn)來(lái)實(shí)現(xiàn)下:
首先,Observable 的構(gòu)造函數(shù)要接收回調(diào)函數(shù) _subscribe,但是不是立刻調(diào)用,而是在 subscribe 的時(shí)候才調(diào)用:
class Observable { constructor(_subscribe) { this._subscribe = _subscribe; } subscribe() { this._subscribe(); } }
回調(diào)函數(shù)的參數(shù)是有 next、error、complete 方法的對(duì)象,用于傳遞事件:
class Observable { constructor(_subscribe) { this._subscribe = _subscribe; } subscribe(observer) { const subscriber = new Subscriber(observer); this._subscribe(subscriber); } } class Subscriber{ constructor(observer) { super(); this.observer = observer; this.isStopped = false; } next(value) { if (this.observer.next && !this.isStopped) { this.observer.next(value); } } error(value) { this.isStopped = true; if (this.observer.error) { this.observer.error(value); } } complete() { this.isStopped = true; if (this.observer.complete) { this.observer.complete(); } if (this.unsubscribe) { this.unsubscribe(); } } }
這樣,在回調(diào)函數(shù)里面就可以調(diào)用 next、error、complete 方法了:
此外,回調(diào)函數(shù)的返回值是 unsbscribe 時(shí)的處理邏輯,要收集起來(lái),在取消訂閱時(shí)調(diào)用:
class Subscription { constructor() { this._teardowns = []; } unsubscribe() { this._teardowns.forEach((teardown) => { typeof teardown === 'function' ? teardown() : teardown.unsubscribe() }); } add(teardown) { if (teardown) { this._teardowns.push(teardown); } } }
提供 unsubscribe 方法用于取消訂閱,_teardowns 用于收集所有的取消訂閱時(shí)的回調(diào),在 unsubscribe 時(shí)調(diào)用所有 teardown 回調(diào)。
這段邏輯比較通用,可以作為 Subscriber 的父類。
然后,在 Observable 里調(diào)用 add 來(lái)添加 teardown,并且返回 subscription(它有 unsubscribe 方法):
class Observable { constructor(_subscribe) { this._subscribe = _subscribe; } subscribe(observer) { const subscriber = new Subscriber(observer); subscriber.add(this._subscribe(subscriber)); return subscriber; } } class Subscriber extends Subscription { constructor(observer) { super(); this.observer = observer; this.isStopped = false; } next(value) { if (this.observer.next && !this.isStopped) { this.observer.next(value); } } error(value) { this.isStopped = true; if (this.observer.error) { this.observer.error(value); } } complete() { this.isStopped = true; if (this.observer.complete) { this.observer.complete(); } if (this.unsubscribe) { this.unsubscribe(); } } } class Subscription { constructor() { this._teardowns = []; } unsubscribe() { this._teardowns.forEach((teardown) => { typeof teardown === 'function' ? teardown() : teardown.unsubscribe() }); } add(teardown) { if (teardown) { this._teardowns.push(teardown); } } }
這樣,我們就實(shí)現(xiàn)了 Observable 和 Observer,只寫了 50 行代碼。先來(lái)測(cè)試下:
const source = new Observable((observer) => { let i = 0; const timer = setInterval(() => { observer.next(++i); }, 1000); return function unsubscribe() { clearInterval(timer); }; }); const subscription = source.subscribe({ next: (v) => console.log(v), error: (err) => console.error(err), complete: () => console.log('complete'), }); setTimeout(() => { subscription.unsubscribe(); }, 4500);
Observer 監(jiān)聽(tīng)到了 Observable 傳遞過(guò)來(lái)的 1、2、3、4 的數(shù)據(jù),因?yàn)樵?4.5s 時(shí)取消了訂閱,所以后面就不再有數(shù)據(jù)了。
我們用 50 行實(shí)現(xiàn)了基礎(chǔ)的 RxJS!
當(dāng)然,最精髓的 operator 還沒(méi)有實(shí)現(xiàn),接下來(lái)繼續(xù)完善。
我們給 Observable 添加 pipe 方法,它會(huì)調(diào)用傳入的 operator,并且上個(gè)的結(jié)果是下個(gè)的輸入,這樣就串起來(lái)了,也就是管道的概念:
class Observable { constructor(_subscribe) { //... } subscribe(observer) { //... } pipe(...operations) { return pipeFromArray(operations)(this); } } function pipeFromArray(fns) { if (fns.length === 0) { return (x) => x; } if (fns.length === 1) { return fns[0]; } return (input) => { return fns.reduce((prev, fn) => fn(prev), input); }; }
當(dāng)傳入的參數(shù)是 0 個(gè)的時(shí)候,就直接返回之前的 Observable,1 個(gè)的時(shí)候直接返回,否則就通過(guò) reduce 的方式串聯(lián)起來(lái),組成管道。
operator 的實(shí)現(xiàn)就是監(jiān)聽(tīng)上一個(gè) Observable,返回一個(gè)新的。
比如 map 的實(shí)現(xiàn),就是傳入 project 對(duì) value 做處理,把結(jié)果用 next 傳下去:
function map(project) { return (observable) => new Observable((subscriber) => { const subcription = observable.subscribe({ next(value) { return subscriber.next(project(value)); }, error(err) { subscriber.error(err); }, complete() { subscriber.complete(); }, }); return subcription; }); }
這樣我們就實(shí)現(xiàn)了 operator,來(lái)測(cè)試下:
我們調(diào)用了 pipe 方法,使用兩個(gè) map 操作符來(lái)組織處理流程,對(duì)數(shù)據(jù)做了 +1 和 *10 的處理。
所以,Observable 傳遞過(guò)來(lái)的 1、2、3、4 傳遞給 Observer 的時(shí)候就變成了 20、30、40、50。
至此,我們實(shí)現(xiàn)了 RxJS 的 Observable、Observer、Subscription、operator 等概念,是一個(gè)簡(jiǎn)易版 RxJS 了。只用了 80 行代碼。
再來(lái)看最開(kāi)始的那些理念:
為什么叫做響應(yīng)式呢?
因?yàn)槭菍?duì)事件源做監(jiān)聽(tīng)和一系列處理的,這種編程模式就叫做響應(yīng)式。
為什么叫函數(shù)式呢?
因?yàn)槊恳徊?operator 都是純函數(shù),返回一個(gè)新的 Observable,這符合函數(shù)式的不可變,修改后返回一個(gè)新的的理念。
為什么叫流呢?
因?yàn)橐粋€(gè)個(gè)事件是動(dòng)態(tài)產(chǎn)生和傳遞的,這種數(shù)據(jù)的動(dòng)態(tài)產(chǎn)生和傳遞就可以叫做流。
完整代碼如下:
function pipeFromArray(fns) { if (fns.length === 0) { return (x) => x; } if (fns.length === 1) { return fns[0]; } return (input) => { return fns.reduce((prev, fn) => fn(prev), input); }; } class Subscription { constructor() { this._teardowns = []; } unsubscribe() { this._teardowns.forEach((teardown) => { typeof teardown === 'function' ? teardown() : teardown.unsubscribe() }); } add(teardown) { if (teardown) { this._teardowns.push(teardown); } } } class Subscriber extends Subscription { constructor(observer) { super(); this.observer = observer; this.isStopped = false; } next(value) { if (this.observer.next && !this.isStopped) { this.observer.next(value); } } error(value) { this.isStopped = true; if (this.observer.error) { this.observer.error(value); } } complete() { this.isStopped = true; if (this.observer.complete) { this.observer.complete(); } if (this.unsubscribe) { this.unsubscribe(); } } } class Observable { constructor(_subscribe) { this._subscribe = _subscribe; } subscribe(observer) { const subscriber = new Subscriber(observer); subscriber.add(this._subscribe(subscriber)); return subscriber; } pipe(...operations) { return pipeFromArray(operations)(this); } } function map(project) { return (observable) => new Observable((subscriber) => { const subcription = observable.subscribe({ next(value) { return subscriber.next(project(value)); }, error(err) { subscriber.error(err); }, complete() { subscriber.complete(); }, }); return subcription; }); } const source = new Observable((observer) => { let i = 0; const timer = setInterval(() => { observer.next(++i); }, 1000); return function unsubscribe() { clearInterval(timer); }; }); const subscription = source.pipe( map((i) => ++i), map((i) => i * 10) ).subscribe({ next: (v) => console.log(v), error: (err) => console.error(err), complete: () => console.log('complete'), }); setTimeout(() => { subscription.unsubscribe(); }, 4500);
總結(jié)
為了理解 RxJS 的響應(yīng)式、函數(shù)式、流等理念,我們實(shí)現(xiàn)了簡(jiǎn)易版的 RxJS。
我們實(shí)現(xiàn)了 Observable、Observer、Subscription 等概念,完成了事件的產(chǎn)生和訂閱以及取消訂閱。
接著又實(shí)現(xiàn)了 operator 和 pipe,每個(gè) operator 返回一個(gè)新的 Observable,對(duì)數(shù)據(jù)做層層處理。
寫完以后,我們能更清晰的理解響應(yīng)式、函數(shù)式、流等理念在 RxJS 里是怎么體現(xiàn)的。
實(shí)現(xiàn)簡(jiǎn)易版 RxJS,只需要 80 行代碼。
原文地址:https://mp.weixin.qq.com/s/G-Af4gDuBuXuHMTV-de-0w