rxjava是一個(gè)在java虛擬機(jī)上的響應(yīng)式擴(kuò)展,通過(guò)使用可觀察的序列將異步和基于事件的程序組合起來(lái)的一個(gè)庫(kù)。
它擴(kuò)展了觀察者模式來(lái)支持?jǐn)?shù)據(jù)/事件序列,并且添加了操作符,這些操作符允許你聲明性地組合序列,同時(shí)抽象出要關(guān)注的問(wèn)題:比如低級(jí)線(xiàn)程、同步、線(xiàn)程安全和并發(fā)數(shù)據(jù)結(jié)構(gòu)等。
rxjava相信大家都非常了解吧,今天分享一下rxjava的消息發(fā)送和線(xiàn)程源碼的分析。最后并分享一個(gè)相關(guān)demo,讓大家更加熟悉我們天天都在用的框架。
消息訂閱發(fā)送
首先讓我們看看消息訂閱發(fā)送最基本的代碼組成:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
observable observable = observable.create( new observableonsubscribe<string>() { @override public void subscribe(observableemitter<string> emitter) throws exception { emitter.onnext( "jack1" ); emitter.onnext( "jack2" ); emitter.onnext( "jack3" ); emitter.oncomplete(); } }); observer<string> observer = new observer<string>() { @override public void onsubscribe(disposable d) { log.d(tag, "onsubscribe" ); } @override public void onnext(string s) { log.d(tag, "onnext : " + s); } @override public void onerror(throwable e) { log.d(tag, "onerror : " + e.tostring()); } @override public void oncomplete() { log.d(tag, "oncomplete" ); } }; observable.subscribe(observer); |
代碼很簡(jiǎn)單,observable為被觀察者,observer為觀察者,然后通過(guò)observable.subscribe(observer),把觀察者和被觀察者關(guān)聯(lián)起來(lái)。被觀察者發(fā)送消息(emitter.onnext("內(nèi)容")),觀察者就可以在onnext()方法里回調(diào)出來(lái)。
我們先來(lái)看observable,創(chuàng)建是用observable.create()方法進(jìn)行創(chuàng)建,源碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public static <t> observable<t> create(observableonsubscribe<t> source) { objecthelper.requirenonnull(source, "source is null" ); return rxjavaplugins.onassembly( new observablecreate<t>(source)); } public static <t> t requirenonnull(t object, string message) { if (object == null ) { throw new nullpointerexception(message); } return object; } public static <t> observable<t> onassembly( @nonnull observable<t> source) { function<? super observable, ? extends observable> f = onobservableassembly; if (f != null ) { return apply(f, source); } return source; } |
可以看出,create()方法里最主要的還是創(chuàng)建用observableonsubscribe傳入創(chuàng)建了一個(gè)observablecreate對(duì)象并且保存而已。
1
2
3
4
5
6
7
8
|
public final class observablecreate<t> extends observable<t> { final observableonsubscribe<t> source; public observablecreate(observableonsubscribe<t> source) { this .source = source; } } |
接著是創(chuàng)建observer,這比較簡(jiǎn)單只是單純創(chuàng)建一個(gè)接口對(duì)象而已
1
2
3
4
5
6
7
8
9
|
public interface observer<t> { void onsubscribe( @nonnull disposable d); void onnext( @nonnull t t); void onerror( @nonnull throwable e); void oncomplete(); } |
訂閱發(fā)送消息
observable.subscribe(observer)的subscribe方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public final void subscribe(observer<? super t> observer) { objecthelper.requirenonnull(observer, "observer is null" ); try { observer = rxjavaplugins.onsubscribe( this , observer); objecthelper.requirenonnull(observer, "plugin returned null observer" ); subscribeactual(observer); } catch (nullpointerexception e) { // nopmd throw e; } catch (throwable e) { exceptions.throwiffatal(e); rxjavaplugins.onerror(e); nullpointerexception npe = new nullpointerexception( "actually not, but can't throw other exceptions due to rs" ); npe.initcause(e); throw npe; } } //objecthelper.requirenonnull()方法 public static <t> t requirenonnull(t object, string message) { if (object == null ) { throw new nullpointerexception(message); } return object; } //rxjavaplugins.onsubscribe()方法 public static <t> observer<? super t> onsubscribe( @nonnull observable<t> source, @nonnull observer<? super t> observer) { bifunction<? super observable, ? super observer, ? extends observer> f = onobservablesubscribe; if (f != null ) { return apply(f, source, observer); } return observer; } |
從上面源碼可以看出requirenonnull()只是做非空判斷而已,而rxjavaplugins.onsubscribe()也只是返回最終的觀察者而已。所以關(guān)鍵代碼是抽象方法subscribeactual(observer);那么subscribeactual對(duì)應(yīng)哪個(gè)代碼段呢?
還記得observable.create()創(chuàng)建的observablecreate類(lèi)嗎,這就是subscribeactual()具體實(shí)現(xiàn)類(lèi),源碼如下:
1
2
3
4
5
6
7
8
9
10
|
protected void subscribeactual(observer<? super t> observer) { createemitter<t> parent = new createemitter<t>(observer); observer.onsubscribe(parent); try { source.subscribe(parent); } catch (throwable ex) { exceptions.throwiffatal(ex); parent.onerror(ex); } } |
從上面的代碼可以看出,首先創(chuàng)建了一個(gè)createemitter對(duì)象并傳入observer,然后回到observer的onsubscribe()方法,而source就是我們之前創(chuàng)建observablecreate傳入的observableonsubscribe對(duì)象。
1
2
3
4
|
class createemitter<t> extends atomicreference<disposable> implements observableemitter<t>, disposable { } |
而createemitter又繼承observableemitter接口,又回調(diào)observableonsubscribe的subscribe方法,對(duì)應(yīng)著我們的:
1
2
3
4
5
6
7
8
9
|
observable observable = observable.create( new observableonsubscribe<string>() { @override public void subscribe(observableemitter<string> emitter) throws exception { emitter.onnext( "jack1" ); emitter.onnext( "jack2" ); emitter.onnext( "jack3" ); emitter.oncomplete(); } }); |
當(dāng)它發(fā)送消息既調(diào)用emitter.onnext()方法時(shí),既調(diào)用了createemitter的onnext()方法:
1
2
3
4
5
6
7
8
9
|
public void onnext(t t) { if (t == null ) { onerror( new nullpointerexception( "onnext called with null. null values are generally not allowed in 2.x operators and sources." )); return ; } if (!isdisposed()) { observer.onnext(t); } } |
可以看到最終又回調(diào)了觀察者的onnext()方法,把被觀察者的數(shù)據(jù)傳輸給了觀察者。有人會(huì)問(wèn)
isdisposed()是什么意思,是判斷要不要終止傳遞的,我們看emitter.oncomplete()源碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
public void oncomplete() { if (!isdisposed()) { try { observer.oncomplete(); } finally { dispose(); } } } public static boolean dispose(atomicreference<disposable> field) { disposable current = field.get(); disposable d = disposed; if (current != d) { current = field.getandset(d); if (current != d) { if (current != null ) { current.dispose(); } return true ; } } return false ; } public static boolean isdisposed(disposable d) { return d == disposed; } |
dispose()方法是終止消息傳遞,也就付了個(gè)disposed常量,而isdisposed()方法就是判斷這個(gè)常量而已。這就是整個(gè)消息訂閱發(fā)送的過(guò)程,用的是觀察者模式。
線(xiàn)程切換
在上面模板代碼的基礎(chǔ)上,線(xiàn)程切換只是改變了如下代碼:
1
2
3
|
observable.subscribeon(schedulers.io()) .observeon(androidschedulers.mainthread()) .subscribe(observer); |
下面我們對(duì)線(xiàn)程切換的源碼進(jìn)行一下分析,分為兩部分:subscribeon()和observeon()
subscribeon()
首先是subscribeon()源碼如下:
1
2
3
4
|
public final observable<t> subscribeon(scheduler scheduler) { objecthelper.requirenonnull(scheduler, "scheduler is null" ); return rxjavaplugins.onassembly( new observablesubscribeon<t>( this , scheduler)); } |
我們傳進(jìn)去了一個(gè)scheduler類(lèi),scheduler是一個(gè)調(diào)度類(lèi),能夠延時(shí)或周期性地去執(zhí)行一個(gè)任務(wù)。
scheduler有如下類(lèi)型:
類(lèi)型 | 使用方式 | 含義 | 使用場(chǎng)景 |
---|---|---|---|
ioscheduler | schedulers.io() | io操作線(xiàn)程 | 讀寫(xiě)sd卡文件,查詢(xún)數(shù)據(jù)庫(kù),訪(fǎng)問(wèn)網(wǎng)絡(luò)等io密集型操作 |
newthreadscheduler | schedulers.newthread() | 創(chuàng)建新線(xiàn)程 | 耗時(shí)操作等 |
singlescheduler | schedulers.single() | 單例線(xiàn)程 | 只需一個(gè)單例線(xiàn)程時(shí) |
computationscheduler | schedulers.computation() | cpu計(jì)算操作線(xiàn)程 | 圖片壓縮取樣、xml,json解析等cpu密集型計(jì)算 |
trampolinescheduler | schedulers.trampoline() | 當(dāng)前線(xiàn)程 | 需要在當(dāng)前線(xiàn)程立即執(zhí)行任務(wù)時(shí) |
handlerscheduler | androidschedulers.mainthread() | android主線(xiàn)程 | 更新ui等 |
接著就沒(méi)什么了,只是返回一個(gè)observablesubscribeon對(duì)象而已。
observeon()
首先看源碼如下:
1
2
3
4
5
6
7
8
9
|
public final observable<t> observeon(scheduler scheduler) { return observeon(scheduler, false , buffersize()); } public final observable<t> observeon(scheduler scheduler, boolean delayerror, int buffersize) { objecthelper.requirenonnull(scheduler, "scheduler is null" ); objecthelper.verifypositive(buffersize, "buffersize" ); return rxjavaplugins.onassembly( new observableobserveon<t>( this , scheduler, delayerror, buffersize)); } |
這里也是沒(méi)什么,只是最終返回一個(gè)observableobserveon對(duì)象而已。
接著還是像原來(lái)那樣調(diào)用subscribe()方法進(jìn)行訂閱,看起來(lái)好像整體變化不大,就是封裝了一些對(duì)象而已,不過(guò)著恰恰是rxjava源碼的精華,當(dāng)他再次調(diào)用subscribeactual()方法時(shí),已經(jīng)不是之前的observablecreate()里subscribeactual方法了,而是最先調(diào)用observableobserveon的subscribeactual()方法,對(duì)應(yīng)源碼如下:
1
2
3
4
5
6
7
8
|
protected void subscribeactual(observer<? super t> observer) { if (scheduler instanceof trampolinescheduler) { source.subscribe(observer); } else { scheduler.worker w = scheduler.createworker(); source.subscribe( new observeonobserver<t>(observer, w, delayerror, buffersize)); } } |
在這里有兩點(diǎn)要講,一點(diǎn)是observeonobserver是執(zhí)行觀察者的線(xiàn)程,后面還會(huì)詳解,然后就是source.subscribe,這個(gè)source.subscribe調(diào)的是observablesubscribeon的subscribe方法,而subscribe方法因?yàn)槔^承的也是observable,是observable里的方法,所以和上面的observablecreate一樣的方法,所以會(huì)調(diào)用observablesubscribeon里的subscribeactual()方法,對(duì)應(yīng)的代碼如下:
1
2
3
4
5
|
public void subscribeactual( final observer<? super t> s) { final subscribeonobserver<t> parent = new subscribeonobserver<t>(s); s.onsubscribe(parent); parent.setdisposable(scheduler.scheduledirect( new subscribetask(parent))); } |
上面代碼中,首先把observeonobserver返回給來(lái)的用subscribeonobserver“包裝”起來(lái),然后在回調(diào)observer的onsubscribe(),就是對(duì)應(yīng)模板代碼的onsubscribe()方法。
接著看subscribetask類(lèi)的源碼:
1
2
3
4
5
6
7
8
9
10
|
final class subscribetask implements runnable { private final subscribeonobserver<t> parent; subscribetask(subscribeonobserver<t> parent) { this .parent = parent; } @override public void run() { source.subscribe(parent); } } |
其中的source.subscribe(parent),就是我們執(zhí)行子線(xiàn)程的回調(diào)方法,對(duì)應(yīng)我們模板代碼里的被觀察者的subscribe()方法。它放在run()方法里,并且繼承runnable,說(shuō)明這個(gè)類(lèi)主要是線(xiàn)程運(yùn)行。接著看scheduler.scheduledirect()方法對(duì)應(yīng)的源碼如下:
1
2
3
4
5
6
7
8
9
10
11
|
public disposable scheduledirect( @nonnull runnable run) { return scheduledirect(run, 0l, timeunit.nanoseconds); } public disposable scheduledirect( @nonnull runnable run, long delay, @nonnull timeunit unit) { final worker w = createworker(); final runnable decoratedrun = rxjavaplugins.onschedule(run); disposetask task = new disposetask(decoratedrun, w); w.schedule(task, delay, unit); return task; } |
在這里,createworker()也是一個(gè)抽象方法,調(diào)用的是我們的調(diào)度類(lèi)對(duì)應(yīng)的schedulers類(lèi)里面的方法,這里是ioscheduler類(lèi),
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
public final class ioscheduler extends scheduler{ final atomicreference<cachedworkerpool> pool; //省略.... public worker createworker() { return new eventloopworker(pool.get()); } static final class eventloopworker extends scheduler.worker { private final compositedisposable tasks; private final cachedworkerpool pool; private final threadworker threadworker; final atomicboolean once = new atomicboolean(); eventloopworker(cachedworkerpool pool) { this .pool = pool; this .tasks = new compositedisposable(); this .threadworker = pool.get(); } //省略.... @nonnull @override public disposable schedule( @nonnull runnable action, long delaytime, @nonnull timeunit unit) { if (tasks.isdisposed()) { // don't schedule, we are unsubscribed return emptydisposable.instance; } return threadworker.scheduleactual(action, delaytime, unit, tasks); } } } static final class cachedworkerpool implements runnable { //省略.... threadworker get() { if (allworkers.isdisposed()) { return shutdown_thread_worker; } while (!expiringworkerqueue.isempty()) { threadworker threadworker = expiringworkerqueue.poll(); if (threadworker != null ) { return threadworker; } } threadworker w = new threadworker(threadfactory); allworkers.add(w); return w; } //省略.... } |
這就是ioscheduler的createworker()的方法,其實(shí)最主要的意思就是獲取線(xiàn)程池,以便于生成子線(xiàn)程,讓subscribetask()可以運(yùn)行。然后直接調(diào)用 w.schedule(task, delay, unit)方法讓它在線(xiàn)程池里執(zhí)行。上面中那threadworker的源碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
static final class threadworker extends newthreadworker { private long expirationtime; threadworker(threadfactory threadfactory) { super (threadfactory); this .expirationtime = 0l; } //省略代碼.... } public class newthreadworker extends scheduler.worker implements disposable { private final scheduledexecutorservice executor; public newthreadworker(threadfactory threadfactory) { executor = schedulerpoolfactory.create(threadfactory); } public scheduledrunnable scheduleactual( final runnable run, long delaytime, @nonnull timeunit unit, @nullable disposablecontainer parent) { runnable decoratedrun = rxjavaplugins.onschedule(run); scheduledrunnable sr = new scheduledrunnable(decoratedrun, parent); if (parent != null ) { if (!parent.add(sr)) { return sr; } } future<?> f; try { if (delaytime <= 0 ) { f = executor.submit((callable<object>)sr); } else { f = executor.schedule((callable<object>)sr, delaytime, unit); } sr.setfuture(f); } catch (rejectedexecutionexception ex) { if (parent != null ) { parent.remove(sr); } rxjavaplugins.onerror(ex); } return sr; } } |
可以看到,這就調(diào)了原始的javaapi來(lái)進(jìn)行線(xiàn)程池操作。
然后最后一環(huán)在子線(xiàn)程調(diào)用source.subscribe(parent)方法,然后回調(diào)剛開(kāi)始創(chuàng)建的observablecreate的subscribeactual(),既:
1
2
3
4
5
6
7
8
9
10
|
protected void subscribeactual(observer<? super t> observer) { createemitter<t> parent = new createemitter<t>(observer); observer.onsubscribe(parent); try { source.subscribe(parent); } catch (throwable ex) { exceptions.throwiffatal(ex); parent.onerror(ex); } } |
進(jìn)行消息的訂閱綁定。
當(dāng)我們?cè)谡{(diào)用 emitter.onnext(內(nèi)容)時(shí),是在io線(xiàn)程里的,那回調(diào)的onnext()又是什么時(shí)候切換的?那就是前面為了整個(gè)流程流暢性沒(méi)講的在observeon()里的observeonobserver是執(zhí)行觀察者的線(xiàn)程的過(guò)程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
class observeonobserver<t> extends basicintqueuedisposable<t> implements observer<t>, runnable { //省略代碼.... observeonobserver(observer<? super t> actual, scheduler.worker worker, boolean delayerror, int buffersize) { this .actual = actual; this .worker = worker; this .delayerror = delayerror; this .buffersize = buffersize; } @override public void onsubscribe(disposable s) { if (disposablehelper.validate( this .s, s)) { this .s = s; if (s instanceof queuedisposable) { @suppresswarnings ( "unchecked" ) queuedisposable<t> qd = (queuedisposable<t>) s; int m = qd.requestfusion(queuedisposable.any | queuedisposable.boundary); if (m == queuedisposable.sync) { sourcemode = m; queue = qd; done = true ; actual.onsubscribe( this ); schedule(); return ; } if (m == queuedisposable.async) { sourcemode = m; queue = qd; actual.onsubscribe( this ); return ; } } queue = new spsclinkedarrayqueue<t>(buffersize); actual.onsubscribe( this ); } } @override public void onnext(t t) { if (done) { return ; } if (sourcemode != queuedisposable.async) { queue.offer(t); } schedule(); } void schedule() { if (getandincrement() == 0 ) { worker.schedule( this ); } } //省略代碼.... } |
當(dāng)調(diào)用emitter.onnext(內(nèi)容)方法,會(huì)調(diào)用上面的onnext()方法,然后在這個(gè)方法里會(huì)把數(shù)據(jù)壓入一個(gè)隊(duì)列,然后執(zhí)行worker.schedule(this)方法,work是什么呢,還記得androidschedulers.mainthread()嗎,這個(gè)對(duì)應(yīng)這個(gè)handlerscheduler這個(gè)類(lèi),所以createworker()對(duì)應(yīng)著:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
private static final class mainholder { static final scheduler default = new handlerscheduler( new handler(looper.getmainlooper())); } public worker createworker() { return new handlerworker(handler); } private static final class handlerworker extends worker { private final handler handler; private volatile boolean disposed; handlerworker(handler handler) { this .handler = handler; } @override public disposable schedule(runnable run, long delay, timeunit unit) { if (run == null ) throw new nullpointerexception( "run == null" ); if (unit == null ) throw new nullpointerexception( "unit == null" ); if (disposed) { return disposables.disposed(); } run = rxjavaplugins.onschedule(run); scheduledrunnable scheduled = new scheduledrunnable(handler, run); message message = message.obtain(handler, scheduled); message.obj = this ; // used as token for batch disposal of this worker's runnables. handler.sendmessagedelayed(message, unit.tomillis(delay)); if (disposed) { handler.removecallbacks(scheduled); return disposables.disposed(); } return scheduled; } } |
在next()方法里,運(yùn)用android自帶的handler消息機(jī)制,通過(guò)把方法包裹在message里,同通過(guò)handler.sendmessagedelayed()發(fā)送消息,就會(huì)在ui線(xiàn)程里回調(diào)next()方法,從而實(shí)現(xiàn)從子線(xiàn)程切換到android主線(xiàn)程的操作。我們?cè)谥骶€(xiàn)程拿到數(shù)據(jù)就可以進(jìn)行各種在主線(xiàn)程的操作了。
總結(jié)一下:
observablecreate 一> observablesubscribeon 一> observableobserveon為初始化順序
當(dāng)調(diào)用observable.subscribe(observer)時(shí)的執(zhí)行順序
observableobserveon 一> observablesubscribeon 一> observablecreate
當(dāng)發(fā)送消息的執(zhí)行順序
observablecreate 一> observablesubscribeon 一> observableobserveon
以上就是消息訂閱和線(xiàn)程切換的源碼的所有講解了。
為了讓你們理解更清楚,我仿照rxjava寫(xiě)了大概的消息訂閱和線(xiàn)程切換的最基本代碼和基本功能,以幫助你們理解
https://github.com/jack921/rxjava2demo
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://www.jianshu.com/p/264b68fd96fa