激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務(wù)器之家:專(zhuān)注于服務(wù)器技術(shù)及軟件下載分享
分類(lèi)導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|JavaScript|易語(yǔ)言|

服務(wù)器之家 - 編程語(yǔ)言 - Java教程 - RxJava的消息發(fā)送和線(xiàn)程切換實(shí)現(xiàn)原理

RxJava的消息發(fā)送和線(xiàn)程切換實(shí)現(xiàn)原理

2021-06-10 14:16Jack921 Java教程

這篇文章主要介紹了RxJava的消息發(fā)送和線(xiàn)程切換實(shí)現(xiàn)原理,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧

rxjava是一個(gè)在java虛擬機(jī)上的響應(yīng)式擴(kuò)展,通過(guò)使用可觀察的序列將異步和基于事件的程序組合起來(lái)的一個(gè)庫(kù)。

它擴(kuò)展了觀察者模式來(lái)支持?jǐn)?shù)據(jù)/事件序列,并且添加了操作符,這些操作符允許你聲明性地組合序列,同時(shí)抽象出要關(guān)注的問(wèn)題:比如低級(jí)線(xiàn)程、同步、線(xiàn)程安全和并發(fā)數(shù)據(jù)結(jié)構(gòu)等。

rxjava相信大家都非常了解吧,今天分享一下rxjava的消息發(fā)送和線(xiàn)程源碼的分析。最后并分享一個(gè)相關(guān)demo,讓大家更加熟悉我們天天都在用的框架。

消息訂閱發(fā)送

首先讓我們看看消息訂閱發(fā)送最基本的代碼組成:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
observable observable = observable.create(new observableonsubscribe<string>() {
     @override
     public void subscribe(observableemitter<string> emitter) throws exception {
       emitter.onnext("jack1");
       emitter.onnext("jack2");
       emitter.onnext("jack3");
       emitter.oncomplete();
     }
   });
 
   observer<string> observer = new observer<string>() {
     @override
     public void onsubscribe(disposable d) {
       log.d(tag, "onsubscribe");
     }
 
     @override
     public void onnext(string s) {
       log.d(tag, "onnext : " + s);
     }
 
     @override
     public void onerror(throwable e) {
       log.d(tag, "onerror : " + e.tostring());
     }
 
     @override
     public void oncomplete() {
       log.d(tag, "oncomplete");
     }
   };
 
   observable.subscribe(observer);

代碼很簡(jiǎn)單,observable為被觀察者,observer為觀察者,然后通過(guò)observable.subscribe(observer),把觀察者和被觀察者關(guān)聯(lián)起來(lái)。被觀察者發(fā)送消息(emitter.onnext("內(nèi)容")),觀察者就可以在onnext()方法里回調(diào)出來(lái)。

我們先來(lái)看observable,創(chuàng)建是用observable.create()方法進(jìn)行創(chuàng)建,源碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static <t> observable<t> create(observableonsubscribe<t> source) {
  objecthelper.requirenonnull(source, "source is null");
  return rxjavaplugins.onassembly(new observablecreate<t>(source));
}
 
public static <t> t requirenonnull(t object, string message) {
  if (object == null) {
     throw new nullpointerexception(message);
  }
  return object;
 }
 
public static <t> observable<t> onassembly(@nonnull observable<t> source) {
  function<? super observable, ? extends observable> f = onobservableassembly;
  if (f != null) {
     return apply(f, source);
  }
  return source;
}

可以看出,create()方法里最主要的還是創(chuàng)建用observableonsubscribe傳入創(chuàng)建了一個(gè)observablecreate對(duì)象并且保存而已。

?
1
2
3
4
5
6
7
8
public final class observablecreate<t> extends observable<t> {
  final observableonsubscribe<t> source;
 
  public observablecreate(observableonsubscribe<t> source) {
    this.source = source;
  }
 
}

接著是創(chuàng)建observer,這比較簡(jiǎn)單只是單純創(chuàng)建一個(gè)接口對(duì)象而已

?
1
2
3
4
5
6
7
8
9
public interface observer<t> {
  void onsubscribe(@nonnull disposable d);
 
  void onnext(@nonnull t t);
 
  void onerror(@nonnull throwable e);
  
  void oncomplete();
}

訂閱發(fā)送消息

observable.subscribe(observer)的subscribe方法如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final void subscribe(observer<? super t> observer) {
  objecthelper.requirenonnull(observer, "observer is null");
  try {
    observer = rxjavaplugins.onsubscribe(this, observer);
    objecthelper.requirenonnull(observer, "plugin returned null observer");
    subscribeactual(observer);
  } catch (nullpointerexception e) { // nopmd
    throw e;
  } catch (throwable e) {
    exceptions.throwiffatal(e);
    rxjavaplugins.onerror(e);
    nullpointerexception npe = new nullpointerexception("actually not, but can't throw other exceptions due to rs");
    npe.initcause(e);
    throw npe;
  }
}
 
//objecthelper.requirenonnull()方法
public static <t> t requirenonnull(t object, string message) {
  if (object == null) {
     throw new nullpointerexception(message);
  }
  return object;
}
 
//rxjavaplugins.onsubscribe()方法
public static <t> observer<? super t> onsubscribe(@nonnull observable<t> source, @nonnull observer<? super t> observer) {
  bifunction<? super observable, ? super observer, ? extends observer> f = onobservablesubscribe;
  if (f != null) {
    return apply(f, source, observer);
  }
  return observer;
}

從上面源碼可以看出requirenonnull()只是做非空判斷而已,而rxjavaplugins.onsubscribe()也只是返回最終的觀察者而已。所以關(guān)鍵代碼是抽象方法subscribeactual(observer);那么subscribeactual對(duì)應(yīng)哪個(gè)代碼段呢?

還記得observable.create()創(chuàng)建的observablecreate類(lèi)嗎,這就是subscribeactual()具體實(shí)現(xiàn)類(lèi),源碼如下:

?
1
2
3
4
5
6
7
8
9
10
protected void subscribeactual(observer<? super t> observer) {
  createemitter<t> parent = new createemitter<t>(observer);
  observer.onsubscribe(parent);
  try {
    source.subscribe(parent);
  } catch (throwable ex) {
    exceptions.throwiffatal(ex);
    parent.onerror(ex);
  }
}

從上面的代碼可以看出,首先創(chuàng)建了一個(gè)createemitter對(duì)象并傳入observer,然后回到observer的onsubscribe()方法,而source就是我們之前創(chuàng)建observablecreate傳入的observableonsubscribe對(duì)象。

?
1
2
3
4
class createemitter<t> extends atomicreference<disposable>
  implements observableemitter<t>, disposable {
 
 }

而createemitter又繼承observableemitter接口,又回調(diào)observableonsubscribe的subscribe方法,對(duì)應(yīng)著我們的:

?
1
2
3
4
5
6
7
8
9
observable observable = observable.create(new observableonsubscribe<string>() {
   @override
   public void subscribe(observableemitter<string> emitter) throws exception {
      emitter.onnext("jack1");
      emitter.onnext("jack2");
      emitter.onnext("jack3");
      emitter.oncomplete();
   }
});

當(dāng)它發(fā)送消息既調(diào)用emitter.onnext()方法時(shí),既調(diào)用了createemitter的onnext()方法:

?
1
2
3
4
5
6
7
8
9
public void onnext(t t) {
  if (t == null) {
    onerror(new nullpointerexception("onnext called with null. null values are generally not allowed in 2.x operators and sources."));
    return;
  }
  if (!isdisposed()) {
    observer.onnext(t);
  }
}

可以看到最終又回調(diào)了觀察者的onnext()方法,把被觀察者的數(shù)據(jù)傳輸給了觀察者。有人會(huì)問(wèn)

isdisposed()是什么意思,是判斷要不要終止傳遞的,我們看emitter.oncomplete()源碼:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void oncomplete() {
  if (!isdisposed()) {
    try {
      observer.oncomplete();
    } finally {
      dispose();
    }
  }
}
 
public static boolean dispose(atomicreference<disposable> field) {
    disposable current = field.get();
    disposable d = disposed;
    if (current != d) {
      current = field.getandset(d);
      if (current != d) {
        if (current != null) {
          current.dispose();
        }
        return true;
      }
    }
    return false;
 }
 
public static boolean isdisposed(disposable d) {
    return d == disposed;
}

dispose()方法是終止消息傳遞,也就付了個(gè)disposed常量,而isdisposed()方法就是判斷這個(gè)常量而已。這就是整個(gè)消息訂閱發(fā)送的過(guò)程,用的是觀察者模式。

線(xiàn)程切換

在上面模板代碼的基礎(chǔ)上,線(xiàn)程切換只是改變了如下代碼:

?
1
2
3
observable.subscribeon(schedulers.io())
     .observeon(androidschedulers.mainthread())
     .subscribe(observer);

下面我們對(duì)線(xiàn)程切換的源碼進(jìn)行一下分析,分為兩部分:subscribeon()和observeon()

subscribeon()

首先是subscribeon()源碼如下:

?
1
2
3
4
public final observable<t> subscribeon(scheduler scheduler) {
  objecthelper.requirenonnull(scheduler, "scheduler is null");
  return rxjavaplugins.onassembly(new observablesubscribeon<t>(this, scheduler));
}

我們傳進(jìn)去了一個(gè)scheduler類(lèi),scheduler是一個(gè)調(diào)度類(lèi),能夠延時(shí)或周期性地去執(zhí)行一個(gè)任務(wù)。

scheduler有如下類(lèi)型:

 

類(lèi)型 使用方式 含義 使用場(chǎng)景
ioscheduler schedulers.io() io操作線(xiàn)程 讀寫(xiě)sd卡文件,查詢(xún)數(shù)據(jù)庫(kù),訪(fǎng)問(wèn)網(wǎng)絡(luò)等io密集型操作
newthreadscheduler schedulers.newthread() 創(chuàng)建新線(xiàn)程 耗時(shí)操作等
singlescheduler schedulers.single() 單例線(xiàn)程 只需一個(gè)單例線(xiàn)程時(shí)
computationscheduler schedulers.computation() cpu計(jì)算操作線(xiàn)程 圖片壓縮取樣、xml,json解析等cpu密集型計(jì)算
trampolinescheduler schedulers.trampoline() 當(dāng)前線(xiàn)程 需要在當(dāng)前線(xiàn)程立即執(zhí)行任務(wù)時(shí)
handlerscheduler androidschedulers.mainthread() android主線(xiàn)程 更新ui等

 

接著就沒(méi)什么了,只是返回一個(gè)observablesubscribeon對(duì)象而已。

observeon()

首先看源碼如下:

?
1
2
3
4
5
6
7
8
9
public final observable<t> observeon(scheduler scheduler) {
  return observeon(scheduler, false, buffersize());
}
 
public final observable<t> observeon(scheduler scheduler, boolean delayerror, int buffersize) {
  objecthelper.requirenonnull(scheduler, "scheduler is null");
  objecthelper.verifypositive(buffersize, "buffersize");
  return rxjavaplugins.onassembly(new observableobserveon<t>(this, scheduler, delayerror, buffersize));
}

這里也是沒(méi)什么,只是最終返回一個(gè)observableobserveon對(duì)象而已。

接著還是像原來(lái)那樣調(diào)用subscribe()方法進(jìn)行訂閱,看起來(lái)好像整體變化不大,就是封裝了一些對(duì)象而已,不過(guò)著恰恰是rxjava源碼的精華,當(dāng)他再次調(diào)用subscribeactual()方法時(shí),已經(jīng)不是之前的observablecreate()里subscribeactual方法了,而是最先調(diào)用observableobserveon的subscribeactual()方法,對(duì)應(yīng)源碼如下:

?
1
2
3
4
5
6
7
8
protected void subscribeactual(observer<? super t> observer) {
  if (scheduler instanceof trampolinescheduler) {
    source.subscribe(observer);
  } else {
    scheduler.worker w = scheduler.createworker();
    source.subscribe(new observeonobserver<t>(observer, w, delayerror, buffersize));
  }
}

在這里有兩點(diǎn)要講,一點(diǎn)是observeonobserver是執(zhí)行觀察者的線(xiàn)程,后面還會(huì)詳解,然后就是source.subscribe,這個(gè)source.subscribe調(diào)的是observablesubscribeon的subscribe方法,而subscribe方法因?yàn)槔^承的也是observable,是observable里的方法,所以和上面的observablecreate一樣的方法,所以會(huì)調(diào)用observablesubscribeon里的subscribeactual()方法,對(duì)應(yīng)的代碼如下:

?
1
2
3
4
5
public void subscribeactual(final observer<? super t> s) {
  final subscribeonobserver<t> parent = new subscribeonobserver<t>(s);
  s.onsubscribe(parent);
  parent.setdisposable(scheduler.scheduledirect(new subscribetask(parent)));
}

上面代碼中,首先把observeonobserver返回給來(lái)的用subscribeonobserver“包裝”起來(lái),然后在回調(diào)observer的onsubscribe(),就是對(duì)應(yīng)模板代碼的onsubscribe()方法。

接著看subscribetask類(lèi)的源碼:

?
1
2
3
4
5
6
7
8
9
10
final class subscribetask implements runnable {
  private final subscribeonobserver<t> parent;
  subscribetask(subscribeonobserver<t> parent) {
    this.parent = parent;
  }
  @override
  public void run() {
    source.subscribe(parent);
  }
}

其中的source.subscribe(parent),就是我們執(zhí)行子線(xiàn)程的回調(diào)方法,對(duì)應(yīng)我們模板代碼里的被觀察者的subscribe()方法。它放在run()方法里,并且繼承runnable,說(shuō)明這個(gè)類(lèi)主要是線(xiàn)程運(yùn)行。接著看scheduler.scheduledirect()方法對(duì)應(yīng)的源碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
public disposable scheduledirect(@nonnull runnable run) {
  return scheduledirect(run, 0l, timeunit.nanoseconds);
}
 
public disposable scheduledirect(@nonnull runnable run, long delay, @nonnull timeunit unit) {
  final worker w = createworker();
  final runnable decoratedrun = rxjavaplugins.onschedule(run);
  disposetask task = new disposetask(decoratedrun, w);
  w.schedule(task, delay, unit);
  return task;
}

在這里,createworker()也是一個(gè)抽象方法,調(diào)用的是我們的調(diào)度類(lèi)對(duì)應(yīng)的schedulers類(lèi)里面的方法,這里是ioscheduler類(lèi),

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public final class ioscheduler extends scheduler{
 
  final atomicreference<cachedworkerpool> pool;
 
  //省略....
 
  public worker createworker() {
    return new eventloopworker(pool.get());
  }
 
  static final class eventloopworker extends scheduler.worker {
    private final compositedisposable tasks;
    private final cachedworkerpool pool;
    private final threadworker threadworker;
 
    final atomicboolean once = new atomicboolean();
 
    eventloopworker(cachedworkerpool pool) {
      this.pool = pool;
      this.tasks = new compositedisposable();
      this.threadworker = pool.get();
    }
 
    //省略....
 
    @nonnull
    @override
    public disposable schedule(@nonnull runnable action, long delaytime, @nonnull timeunit unit) {
      if (tasks.isdisposed()) {
        // don't schedule, we are unsubscribed
        return emptydisposable.instance;
      }
      return threadworker.scheduleactual(action, delaytime, unit, tasks);
    }
  }
 
}
 
 static final class cachedworkerpool implements runnable {
 
  //省略....
 
  threadworker get() {
    if (allworkers.isdisposed()) {
      return shutdown_thread_worker;
    }
    while (!expiringworkerqueue.isempty()) {
      threadworker threadworker = expiringworkerqueue.poll();
      if (threadworker != null) {
        return threadworker;
      }
    }
 
    threadworker w = new threadworker(threadfactory);
    allworkers.add(w);
    return w;
   }
   //省略....
}

這就是ioscheduler的createworker()的方法,其實(shí)最主要的意思就是獲取線(xiàn)程池,以便于生成子線(xiàn)程,讓subscribetask()可以運(yùn)行。然后直接調(diào)用 w.schedule(task, delay, unit)方法讓它在線(xiàn)程池里執(zhí)行。上面中那threadworker的源碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static final class threadworker extends newthreadworker {
  private long expirationtime;
  threadworker(threadfactory threadfactory) {
    super(threadfactory);
    this.expirationtime = 0l;
  }
 
  //省略代碼....
 }
 
public class newthreadworker extends scheduler.worker implements disposable {
  private final scheduledexecutorservice executor;
 
  public newthreadworker(threadfactory threadfactory) {
    executor = schedulerpoolfactory.create(threadfactory);
  }
 
  public scheduledrunnable scheduleactual(final runnable run, long delaytime, @nonnull timeunit unit, @nullable disposablecontainer parent) {
    runnable decoratedrun = rxjavaplugins.onschedule(run);
 
    scheduledrunnable sr = new scheduledrunnable(decoratedrun, parent);
 
    if (parent != null) {
      if (!parent.add(sr)) {
        return sr;
      }
    }
 
    future<?> f;
    try {
      if (delaytime <= 0) {
        f = executor.submit((callable<object>)sr);
      } else {
        f = executor.schedule((callable<object>)sr, delaytime, unit);
      }
      sr.setfuture(f);
    } catch (rejectedexecutionexception ex) {
      if (parent != null) {
        parent.remove(sr);
      }
      rxjavaplugins.onerror(ex);
    }
 
    return sr;
  }
}

可以看到,這就調(diào)了原始的javaapi來(lái)進(jìn)行線(xiàn)程池操作。

然后最后一環(huán)在子線(xiàn)程調(diào)用source.subscribe(parent)方法,然后回調(diào)剛開(kāi)始創(chuàng)建的observablecreate的subscribeactual(),既:

?
1
2
3
4
5
6
7
8
9
10
protected void subscribeactual(observer<? super t> observer) {
    createemitter<t> parent = new createemitter<t>(observer);
    observer.onsubscribe(parent);
    try {
      source.subscribe(parent);
    } catch (throwable ex) {
      exceptions.throwiffatal(ex);
      parent.onerror(ex);
    }
}

進(jìn)行消息的訂閱綁定。

當(dāng)我們?cè)谡{(diào)用 emitter.onnext(內(nèi)容)時(shí),是在io線(xiàn)程里的,那回調(diào)的onnext()又是什么時(shí)候切換的?那就是前面為了整個(gè)流程流暢性沒(méi)講的在observeon()里的observeonobserver是執(zhí)行觀察者的線(xiàn)程的過(guò)程。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class observeonobserver<t> extends basicintqueuedisposable<t>
  implements observer<t>, runnable {
 
    //省略代碼....
 
    observeonobserver(observer<? super t> actual, scheduler.worker worker, boolean delayerror, int buffersize) {
      this.actual = actual;
      this.worker = worker;
      this.delayerror = delayerror;
      this.buffersize = buffersize;
    }
 
    @override
    public void onsubscribe(disposable s) {
      if (disposablehelper.validate(this.s, s)) {
        this.s = s;
        if (s instanceof queuedisposable) {
          @suppresswarnings("unchecked")
          queuedisposable<t> qd = (queuedisposable<t>) s;
          int m = qd.requestfusion(queuedisposable.any | queuedisposable.boundary);
          if (m == queuedisposable.sync) {
            sourcemode = m;
            queue = qd;
            done = true;
            actual.onsubscribe(this);
            schedule();
            return;
          }
          if (m == queuedisposable.async) {
            sourcemode = m;
            queue = qd;
            actual.onsubscribe(this);
            return;
          }
        }
        queue = new spsclinkedarrayqueue<t>(buffersize);
        actual.onsubscribe(this);
      }
    }
 
    @override
    public void onnext(t t) {
      if (done) {
        return;
      }
      if (sourcemode != queuedisposable.async) {
        queue.offer(t);
      }
      schedule();
    
 
    void schedule() {
      if (getandincrement() == 0) {
        worker.schedule(this);
      }
    }
    //省略代碼....
  }

當(dāng)調(diào)用emitter.onnext(內(nèi)容)方法,會(huì)調(diào)用上面的onnext()方法,然后在這個(gè)方法里會(huì)把數(shù)據(jù)壓入一個(gè)隊(duì)列,然后執(zhí)行worker.schedule(this)方法,work是什么呢,還記得androidschedulers.mainthread()嗎,這個(gè)對(duì)應(yīng)這個(gè)handlerscheduler這個(gè)類(lèi),所以createworker()對(duì)應(yīng)著:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static final class mainholder {
    static final scheduler default = new handlerscheduler(new handler(looper.getmainlooper()));
}
 
 
public worker createworker() {
  return new handlerworker(handler);
}
 
private static final class handlerworker extends worker {
    private final handler handler;
    private volatile boolean disposed;
 
    handlerworker(handler handler) {
      this.handler = handler;
    }
 
    @override
    public disposable schedule(runnable run, long delay, timeunit unit) {
      if (run == null) throw new nullpointerexception("run == null");
      if (unit == null) throw new nullpointerexception("unit == null");
      if (disposed) {
        return disposables.disposed();
      }
      run = rxjavaplugins.onschedule(run);
      scheduledrunnable scheduled = new scheduledrunnable(handler, run);
      message message = message.obtain(handler, scheduled);
      message.obj = this; // used as token for batch disposal of this worker's runnables.
      handler.sendmessagedelayed(message, unit.tomillis(delay));
      if (disposed) {
        handler.removecallbacks(scheduled);
        return disposables.disposed();
      }
      return scheduled;
    }
}

在next()方法里,運(yùn)用android自帶的handler消息機(jī)制,通過(guò)把方法包裹在message里,同通過(guò)handler.sendmessagedelayed()發(fā)送消息,就會(huì)在ui線(xiàn)程里回調(diào)next()方法,從而實(shí)現(xiàn)從子線(xiàn)程切換到android主線(xiàn)程的操作。我們?cè)谥骶€(xiàn)程拿到數(shù)據(jù)就可以進(jìn)行各種在主線(xiàn)程的操作了。

總結(jié)一下:

RxJava的消息發(fā)送和線(xiàn)程切換實(shí)現(xiàn)原理

observablecreate 一> observablesubscribeon 一> observableobserveon為初始化順序

當(dāng)調(diào)用observable.subscribe(observer)時(shí)的執(zhí)行順序
observableobserveon 一> observablesubscribeon 一> observablecreate

當(dāng)發(fā)送消息的執(zhí)行順序
observablecreate 一> observablesubscribeon 一> observableobserveon

以上就是消息訂閱和線(xiàn)程切換的源碼的所有講解了。

為了讓你們理解更清楚,我仿照rxjava寫(xiě)了大概的消息訂閱和線(xiàn)程切換的最基本代碼和基本功能,以幫助你們理解

https://github.com/jack921/rxjava2demo

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。

原文鏈接:https://www.jianshu.com/p/264b68fd96fa

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 亚洲资源在线播放 | 日韩精品久久久久久久电影99爱 | 亚洲综合视频网 | 久久久久北条麻妃免费看 | 日韩黄色免费电影 | 国产美女做爰免费视 | 黄色片免费在线播放 | 超碰97人人艹 | 在线看免费观看日本 | 免费毛片视频播放 | 欧美黄色性视频 | 国产视频在线观看一区二区三区 | 九九热精品在线 | 毛片视频在线免费观看 | 国产精品刺激对白麻豆99 | 日韩视频区 | www.99re1.com| 亚洲福利在线观看视频 | 欧美a区| 成人短视频在线播放 | 男女做性免费网站 | 久久一本日日摸夜夜添 | 99欧美视频 | 国产一级在线免费观看 | 91精品国产乱码久久桃 | 一级空姐毛片 | 日韩色电影 | 操操日日 | 国产精品视频一区二区三区综合 | 天天夜碰日日摸日日澡性色av | 国产一区亚洲 | 日本网站在线看 | 欧美一级精品 | 黄色大片在线观看 | 露脸各种姿势啪啪的清纯美女 | 欧美一级黄视频 | 国产1区2| 成人毛片久久 | 91精品成人福利在线播放 | 我爱我色成人网 | 亚洲最大av网站 |