激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

香港云服务器
服務器之家 - 編程語言 - Java教程 - redisson實現(xiàn)分布式鎖原理

redisson實現(xiàn)分布式鎖原理

2020-08-06 14:51min.jiang Java教程

本文將詳細介紹redisson實現(xiàn)分布式鎖原理。具有很好的參考價值,下面跟著小編一起來看下吧

Redisson分布式鎖

之前的基于注解的鎖有一種鎖是基本redis的分布式鎖,鎖的實現(xiàn)我是基于redisson組件提供的RLock,這篇來看看redisson是如何實現(xiàn)鎖的。

不同版本實現(xiàn)鎖的機制并不相同

引用的redisson最近發(fā)布的版本3.2.3,不同的版本可能實現(xiàn)鎖的機制并不相同,早期版本好像是采用簡單的setnx,getset等常規(guī)命令來配置完成,而后期由于redis支持了腳本Lua變更了實現(xiàn)原理。

?
1
2
3
4
5
<dependency>
 <groupId>org.redisson</groupId>
 <artifactId>redisson</artifactId>
 <version>3.2.3</version>
</dependency>

setnx需要配合getset以及事務來完成,這樣才能比較好的避免死鎖問題,而新版本由于支持lua腳本,可以避免使用事務以及操作多個redis命令,語義表達更加清晰一些。

RLock接口的特點

繼承標準接口Lock

擁有標準鎖接口的所有特性,比如lock,unlock,trylock等等。

擴展標準接口Lock

擴展了很多方法,常用的主要有:強制鎖釋放,帶有效期的鎖,還有一組異步的方法。其中前面兩個方法主要是解決標準lock可能造成的死鎖問題。比如某個線程獲取到鎖之后,線程所在機器死機,此時獲取了鎖的線程無法正常釋放鎖導致其余的等待鎖的線程一直等待下去。

可重入機制

各版本實現(xiàn)有差異,可重入主要考慮的是性能,同一線程在未釋放鎖時如果再次申請鎖資源不需要走申請流程,只需要將已經(jīng)獲取的鎖繼續(xù)返回并且記錄上已經(jīng)重入的次數(shù)即可,與jdk里面的ReentrantLock功能類似。重入次數(shù)靠hincrby命令來配合使用,詳細的參數(shù)下面的代碼。

怎么判斷是同一線程?

redisson的方案是,RedissonLock實例的一個guid再加當前線程的id,通過getLockName返回

?
1
2
3
4
5
6
7
8
9
10
11
public class RedissonLock extends RedissonExpirable implements RLock {
 final UUID id;
 protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
  super(commandExecutor, name);
  this.internalLockLeaseTime = TimeUnit.SECONDS.toMillis(30L);
  this.commandExecutor = commandExecutor;
  this.id = id;
 }
 String getLockName(long threadId) {
  return this.id + ":" + threadId;
 }

RLock獲取鎖的兩種場景

這里拿tryLock的源碼來看:tryAcquire方法是申請鎖并返回鎖有效期還剩余的時間,如果為空說明鎖未被其它線程申請直接獲取并返回,如果獲取到時間,則進入等待競爭邏輯。

?
1
2
3
4
5
6
7
8
9
10
11
12
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  long time = unit.toMillis(waitTime);
  long current = System.currentTimeMillis();
  final long threadId = Thread.currentThread().getId();
  Long ttl = this.tryAcquire(leaseTime, unit);
  if(ttl == null) {
   //直接獲取到鎖
   return true;
  } else {
   //有競爭的后續(xù)看
  }
 }

無競爭,直接獲取鎖

先看下首先獲取鎖并釋放鎖背后的redis都在做什么,可以利用redis的monitor來在后臺監(jiān)控redis的執(zhí)行情況。當我們在方法了增加@RequestLockable之后,其實就是調用lock以及unlock,下面是redis命令:

加鎖

由于高版本的redis支持lua腳本,所以redisson也對其進行了支持,采用了腳本模式,不熟悉lua腳本的可以去查找下。執(zhí)行l(wèi)ua命令的邏輯如下:

?
1
2
3
4
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call(\'exists\', KEYS[1]) == 0) then redis.call(\'hset\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; return redis.call(\'pttl\', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{Long.valueOf(this.internalLockLeaseTime), this.getLockName(threadId)});
  }

加鎖的流程:

  1. 判斷l(xiāng)ock鍵是否存在,不存在直接調用hset存儲當前線程信息并且設置過期時間,返回nil,告訴客戶端直接獲取到鎖。
  2. 判斷l(xiāng)ock鍵是否存在,存在則將重入次數(shù)加1,并重新設置過期時間,返回nil,告訴客戶端直接獲取到鎖。
  3. 被其它線程已經(jīng)鎖定,返回鎖有效期的剩余時間,告訴客戶端需要等待。
?
1
2
3
4
5
6
7
8
9
10
11
12
"EVAL"
"if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
return redis.call('pttl', KEYS[1]);"
 "1" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
 "1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21"

上面的lua腳本會轉換成真正的redis命令,下面的是經(jīng)過lua腳本運算之后實際執(zhí)行的redis命令。

?
1
2
3
4
1486642677.053488 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
1486642677.053515 [0 lua] "hset" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
"346e1eb8-5bfd-4d49-9870-042df402f248:21" "1"
1486642677.053540 [0 lua] "pexpire" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000"

解鎖

解鎖的流程看起來復雜些:

  1. 如果lock鍵不存在,發(fā)消息說鎖已經(jīng)可用
  2. 如果鎖不是被當前線程鎖定,則返回nil
  3. 由于支持可重入,在解鎖時將重入次數(shù)需要減1
  4. 如果計算后的重入次數(shù)>0,則重新設置過期時間
  5. 如果計算后的重入次數(shù)<=0,則發(fā)消息說鎖已經(jīng)可用
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
"EVAL"
"if (redis.call('exists', KEYS[1]) == 0) then
 redis.call('publish', KEYS[2], ARGV[1]);
 return 1; end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end;
return nil;"
"2" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
"redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}"
 "0" "1000"
 "346e1eb8-5bfd-4d49-9870-042df402f248:21"

無競爭情況下解鎖redis命令:

主要是發(fā)送一個解鎖的消息,以此喚醒等待隊列中的線程重新競爭鎖。

?
1
2
1486642678.493691 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
1486642678.493712 [0 lua] "publish" "redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}" "0"

有競爭,等待

有競爭的情況在redis端的lua腳本是相同的,只是不同的條件執(zhí)行不同的redis命令,復雜的在redisson的源碼上。當通過tryAcquire發(fā)現(xiàn)鎖被其它線程申請時,需要進入等待競爭邏輯中。

  • this.await返回false,說明等待時間已經(jīng)超出獲取鎖最大等待時間,取消訂閱并返回獲取鎖失敗
  • this.await返回true,進入循環(huán)嘗試獲取鎖。
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    final long threadId = Thread.currentThread().getId();
    Long ttl = this.tryAcquire(leaseTime, unit);
    if(ttl == null) {
      return true;
    } else {
      //重點是這段
      time -= System.currentTimeMillis() - current;
      if(time <= 0L) {
        return false;
      } else {
        current = System.currentTimeMillis();
        final RFuture subscribeFuture = this.subscribe(threadId);
        if(!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
          if(!subscribeFuture.cancel(false)) {
            subscribeFuture.addListener(new FutureListener() {
              public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                if(subscribeFuture.isSuccess()) {
                  RedissonLock.this.unsubscribe(subscribeFuture, threadId);
                }
              }
            });
          }
          return false;
        } else {
          boolean var16;
          try {
            time -= System.currentTimeMillis() - current;
            if(time <= 0L) {
              boolean currentTime1 = false;
              return currentTime1;
            }
            do {
              long currentTime = System.currentTimeMillis();
              ttl = this.tryAcquire(leaseTime, unit);
              if(ttl == null) {
                var16 = true;
                return var16;
              }
              time -= System.currentTimeMillis() - currentTime;
              if(time <= 0L) {
                var16 = false;
                return var16;
              }
              currentTime = System.currentTimeMillis();
              if(ttl.longValue() >= 0L && ttl.longValue() < time) {
                this.getEntry(threadId).getLatch().tryAcquire(ttl.longValue(), TimeUnit.MILLISECONDS);
              } else {
                this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
              }
              time -= System.currentTimeMillis() - currentTime;
            } while(time > 0L);
            var16 = false;
          } finally {
            this.unsubscribe(subscribeFuture, threadId);
          }
          return var16;
        }
      }
    }
  }

循環(huán)嘗試一般有如下幾種方法:

  • while循環(huán),一次接著一次的嘗試,這個方法的缺點是會造成大量無效的鎖申請。
  • Thread.sleep,在上面的while方案中增加睡眠時間以降低鎖申請次數(shù),缺點是這個睡眠的時間設置比較難控制。
  • 基于信息量,當鎖被其它資源占用時,當前線程訂閱鎖的釋放事件,一旦鎖釋放會發(fā)消息通知待等待的鎖進行競爭,有效的解決了無效的鎖申請情況。核心邏輯是this.getEntry(threadId).getLatch().tryAcquire,this.getEntry(threadId).getLatch()返回的是一個信號量,有興趣可以再研究研究。

redisson依賴

由于redisson不光是針對鎖,提供了很多客戶端操作redis的方法,所以會依賴一些其它的框架,比如netty,如果只是簡單的使用鎖也可以自己去實現(xiàn)。

 以上就是本文的全部內(nèi)容,希望本文的內(nèi)容對大家的學習或者工作能帶來一定的幫助,同時也希望多多支持服務器之家!

原文鏈接:http://www.cnblogs.com/ASPNET2008/p/6385249.html

延伸 · 閱讀

精彩推薦
586
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25
主站蜘蛛池模板: videos韩国| 日本精品一区二区 | 日韩精品羞羞答答 | 91香蕉国产亚洲一区二区三区 | 国产精品久久久久久久模特 | 欧美一区二区三区久久精品视 | 日本不卡一二三区 | 亚洲极色 | 俄罗斯论理片 | 男女羞羞视频在线观看免费 | 国产毛片毛片 | 日韩av一二三区 | 色多多视频导航 | av在线免费观看中文字幕 | 国产喷白浆10p | 欧美国产一级片 | 中文字幕在线日韩 | 久久亚洲第一 | 蜜桃欧美性大片免费视频 | 久久久久成人网 | 国内精品伊人久久 | 91精品观看91久久久久久国产 | 蜜桃视频在线观看视频 | 99seav| 操毛片| 视频久久免费 | 久久精品伊人网 | 久久精品视频在线看99 | 国产成人精品一区在线播放 | 久久蜜桃精品一区二区三区综合网 | 久久久久久久久国产 | 99久久久国产精品免费观看 | 91av99| 黄视频网站免费在线观看 | 久久久日韩精品一区二区三区 | 成年性羞羞视频免费观看无限 | av在线播放免费 | 国产成人在线免费观看视频 | 美女黄色影院 | 久久精品视频国产 | 日韩视频精品一区 |