分布式鎖是啥?
單機鎖的概念:我們正常跑的單機項目(也就是在tomcat下跑一個項目不配置集群)想要在高并發的時候加鎖很容易就可以搞定,java提供了很多的機制例如:synchronized、volatile、ReentrantLock等鎖的機制。
為啥需要分布式鎖:當我們的項目比較龐大的時候,單機版的項目已經不能滿足吞吐量的需求了,需要對項目做負載均衡,有可能還需要對項目進行解耦拆分成不同的服務,那么肯定是做成分布式的項目,分布式的項目因為是不同的程序控制,所以使用java提供的鎖并不能完全保證并發需求,需要借助第三方的框架來實現對并發的阻塞控制,來滿足實際業務的需要。
一、使用分布式鎖要滿足的幾個條件:
1.系統是一個分布式系統(關鍵是分布式,單機的可以使用ReentrantLock或者synchronized代碼塊來實現)
2.共享資源(各個系統訪問同一個資源,資源的載體可能是傳統關系型數據庫或者NoSQL)
3.同步訪問(即有很多個進程同事訪問同一個共享資源。沒有同步訪問,誰管你資源競爭不競爭)
二、應用的場景例子
管理后臺的部署架構(多臺tomcat服務器+redis【多臺tomcat服務器訪問一臺redis】+mysql【多臺tomcat服務器訪問一臺服務器上的mysql】)就滿足使用分布式鎖的條件。多臺服務器要訪問redis全局緩存的資源,如果不使用分布式鎖就會出現問題。 看如下偽代碼:
1
2
3
4
5
6
|
long N=0L; //N從redis獲取值 if(N<5){ N++; //N寫回redis } |
上面的代碼主要實現的功能:
從redis獲取值N,對數值N進行邊界檢查,自加1,然后N寫回redis中。 這種應用場景很常見,像秒殺,全局遞增ID、IP訪問限制等。以IP訪問限制來說,惡意攻擊者可能發起無限次訪問,并發量比較大,分布式環境下對N的邊界檢查就不可靠,因為從redis讀的N可能已經是臟數據。傳統的加鎖的做法(如java的synchronized和Lock)也沒用,因為這是分布式環境,這個同步問題的救火隊員也束手無策。在這危急存亡之秋,分布式鎖終于有用武之地了。
分布式鎖可以基于很多種方式實現,比如zookeeper、redis...。不管哪種方式,他的基本原理是不變的:用一個狀態值表示鎖,對鎖的占用和釋放通過狀態值來標識。
這里主要講如何用redis實現分布式鎖。
三、使用redis的setNX命令實現分布式鎖
1、實現的原理
Redis為單進程單線程模式,采用隊列模式將并發訪問變成串行訪問,且多客戶端對Redis的連接并不存在競爭關系。redis的SETNX命令可以方便的實現分布式鎖。
2、基本命令解析
1)setNX(SET if Not eXists)
語法:
SETNX key value
將 key 的值設為 value ,當且僅當 key 不存在。
若給定的 key 已經存在,則 SETNX 不做任何動作。
SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫
返回值:
設置成功,返回 1 。
設置失敗,返回 0 。
例子:
1
2
3
4
5
6
7
8
9
10
11
|
redis> EXISTS job # job 不存在 ( integer ) 0 redis> SETNX job "programmer" # job 設置成功 ( integer ) 1 redis> SETNX job "code-farmer" # 嘗試覆蓋 job ,失敗 ( integer ) 0 redis> GET job # 沒有被覆蓋 "programmer" |
所以我們使用執行下面的命令
1
|
SETNX lock.foo <current Unix time + lock timeout + 1> |
如返回1,則該客戶端獲得鎖,把lock.foo的鍵值設置為時間值表示該鍵已被鎖定,該客戶端最后可以通過DEL lock.foo來釋放該鎖。
如返回0,表明該鎖已被其他客戶端取得,這時我們可以先返回或進行重試等對方完成或等待鎖超時。
2)getSET
語法:
GETSET key value
將給定 key 的值設為 value ,并返回 key 的舊值(old value)。
當 key 存在但不是字符串類型時,返回一個錯誤。
返回值:
返回給定 key 的舊值。
當 key 沒有舊值時,也即是, key 不存在時,返回 nil 。
3)get
語法:
GET key
返回值:
當 key 不存在時,返回 nil ,否則,返回 key 的值。
如果 key 不是字符串類型,那么返回一個錯誤
四、解決死鎖
上面的鎖定邏輯有一個問題:如果一個持有鎖的客戶端失敗或崩潰了不能釋放鎖,該怎么解決?
我們可以通過鎖的鍵對應的時間戳來判斷這種情況是否發生了,如果當前的時間已經大于lock.foo的值,說明該鎖已失效,可以被重新使用。
發生這種情況時,可不能簡單的通過DEL來刪除鎖,然后再SETNX一次(講道理,刪除鎖的操作應該是鎖擁有這執行的,這里只需要等它超時即可),當多個客戶端檢測到鎖超時后都會嘗試去釋放它,這里就可能出現一個競態條件,讓我們模擬一下這個場景:
C0操作超時了,但它還持有著鎖,C1和C2讀取lock.foo檢查時間戳,先后發現超時了。
C1 發送DEL lock.foo
C1 發送SETNX lock.foo 并且成功了。
C2 發送DEL lock.foo
C2 發送SETNX lock.foo 并且成功了。
這樣一來,C1,C2都拿到了鎖!問題大了!
幸好這種問題是可以避免的,讓我們來看看C3這個客戶端是怎樣做的:
C3發送SETNX lock.foo 想要獲得鎖,由于C0還持有鎖,所以Redis返回給C3一個0
C3發送GET lock.foo 以檢查鎖是否超時了,如果沒超時,則等待或重試。
反之,如果已超時,C3通過下面的操作來嘗試獲得鎖:
GETSET lock.foo <current Unix time + lock timeout + 1>
通過GETSET,C3拿到的時間戳如果仍然是超時的,那就說明,C3如愿以償拿到鎖了。
如果在C3之前,有個叫C4的客戶端比C3快一步執行了上面的操作,那么C3拿到的時間戳是個未超時的值,這時,C3沒有如期獲得鎖,需要再次等待或重試。留意一下,盡管C3沒拿到鎖,但它改寫了C4設置的鎖的超時值,不過這一點非常微小的誤差帶來的影響可以忽略不計。
注意:為了讓分布式鎖的算法更穩鍵些,持有鎖的客戶端在解鎖之前應該再檢查一次自己的鎖是否已經超時,再去做DEL操作,因為可能客戶端因為某個耗時的操作而掛起,操作完的時候鎖因為超時已經被別人獲得,這時就不必解鎖了。
五、代碼實現
expireMsecs 鎖持有超時,防止線程在入鎖以后,無限的執行下去,讓鎖無法釋放
timeoutMsecs 鎖等待超時,防止線程饑餓,永遠沒有入鎖執行代碼的機會
注意:項目里面需要先搭建好redis的相關配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
|
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * Redis distributed lock implementation. * * @author zhengcanrui */ public class RedisLock { private static Logger logger = LoggerFactory.getLogger(RedisLock. class ); private RedisTemplate redisTemplate; private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100 ; /** * Lock key path. */ private String lockKey; /** * 鎖超時時間,防止線程在入鎖以后,無限的執行等待 */ private int expireMsecs = 60 * 1000 ; /** * 鎖等待時間,防止線程饑餓 */ private int timeoutMsecs = 10 * 1000 ; private volatile boolean locked = false ; /** * Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs. * * @param lockKey lock key (ex. account:1, ...) */ public RedisLock(RedisTemplate redisTemplate, String lockKey) { this .redisTemplate = redisTemplate; this .lockKey = lockKey + "_lock" ; } /** * Detailed constructor with default lock expiration of 60000 msecs. * */ public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs) { this (redisTemplate, lockKey); this .timeoutMsecs = timeoutMsecs; } /** * Detailed constructor. * */ public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs, int expireMsecs) { this (redisTemplate, lockKey, timeoutMsecs); this .expireMsecs = expireMsecs; } /** * @return lock key */ public String getLockKey() { return lockKey; } private String get( final String key) { Object obj = null ; try { obj = redisTemplate.execute( new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisSerializer serializer = new StringRedisSerializer(); byte [] data = connection.get(serializer.serialize(key)); connection.close(); if (data == null ) { return null ; } return serializer.deserialize(data); } }); } catch (Exception e) { logger.error( "get redis error, key : {}" , key); } return obj != null ? obj.toString() : null ; } private boolean setNX( final String key, final String value) { Object obj = null ; try { obj = redisTemplate.execute( new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisSerializer serializer = new StringRedisSerializer(); Boolean success = connection.setNX(serializer.serialize(key), serializer.serialize(value)); connection.close(); return success; } }); } catch (Exception e) { logger.error( "setNX redis error, key : {}" , key); } return obj != null ? (Boolean) obj : false ; } private String getSet( final String key, final String value) { Object obj = null ; try { obj = redisTemplate.execute( new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisSerializer serializer = new StringRedisSerializer(); byte [] ret = connection.getSet(serializer.serialize(key), serializer.serialize(value)); connection.close(); return serializer.deserialize(ret); } }); } catch (Exception e) { logger.error( "setNX redis error, key : {}" , key); } return obj != null ? (String) obj : null ; } /** * 獲得 lock. * 實現思路: 主要是使用了redis 的setnx命令,緩存了鎖. * reids緩存的key是鎖的key,所有的共享, value是鎖的到期時間(注意:這里把過期時間放在value了,沒有時間上設置其超時時間) * 執行過程: * 1.通過setnx嘗試設置某個key的值,成功(當前沒有這個鎖)則返回,成功獲得鎖 * 2.鎖已經存在則獲取鎖的到期時間,和當前時間比較,超時的話,則設置新的值 * * @return true if lock is acquired, false acquire timeouted * @throws InterruptedException in case of thread interruption */ public synchronized boolean lock() throws InterruptedException { int timeout = timeoutMsecs; while (timeout >= 0 ) { long expires = System.currentTimeMillis() + expireMsecs + 1 ; String expiresStr = String.valueOf(expires); //鎖到期時間 if ( this .setNX(lockKey, expiresStr)) { // lock acquired locked = true ; return true ; } String currentValueStr = this .get(lockKey); //redis里的時間 if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { //判斷是否為空,不為空的情況下,如果被其他線程設置了值,則第二個條件判斷是過不去的 // lock is expired String oldValueStr = this .getSet(lockKey, expiresStr); //獲取上一個鎖到期時間,并設置現在的鎖到期時間, //只有一個線程才能獲取上一個線上的設置時間,因為jedis.getSet是同步的 if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { //防止誤刪(覆蓋,因為key是相同的)了他人的鎖——這里達不到效果,這里值會被覆蓋,但是因為什么相差了很少的時間,所以可以接受 //[分布式的情況下]:如過這個時候,多個線程恰好都到了這里,但是只有一個線程的設置值和當前值相同,他才有權利獲取鎖 // lock acquired locked = true ; return true ; } } timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS; /* 延遲100 毫秒, 這里使用隨機時間可能會好一點,可以防止饑餓進程的出現,即,當同時到達多個進程, 只會有一個進程獲得鎖,其他的都用同樣的頻率進行嘗試,后面有來了一些進行,也以同樣的頻率申請鎖,這將可能導致前面來的鎖得不到滿足. 使用隨機的等待時間可以一定程度上保證公平性 */ Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS); } return false; } /** * Acqurired lock release. */ public synchronized void unlock() { if (locked) { redisTemplate.delete(lockKey); locked = false ; } } } |
調用:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
RedisLock lock = new RedisLock(redisTemplate, key, 10000 , 20000 ); try { if (lock.lock()) { //需要加鎖的代碼 } } } catch (InterruptedException e) { e.printStackTrace(); } finally { //為了讓分布式鎖的算法更穩鍵些,持有鎖的客戶端在解鎖之前應該再檢查一次自己的鎖是否已經超時,再去做DEL操作,因為可能客戶端因為某個耗時的操作而掛起, //操作完的時候鎖因為超時已經被別人獲得,這時就不必解鎖了。 ————這里沒有做 lock.unlock(); } |
六、一些問題
1、為什么不直接使用expire設置超時時間,而將時間的毫秒數其作為value放在redis中?
如下面的方式,把超時的交給redis處理:
1
2
3
4
5
|
lock(key, expireSec){ isSuccess = setnx key if (isSuccess) expire key expireSec } |
這種方式貌似沒什么問題,但是假如在setnx后,redis崩潰了,expire就沒有執行,結果就是死鎖了。鎖永遠不會超時。
2、為什么前面的鎖已經超時了,還要用getSet去設置新的時間戳的時間獲取舊的值,然后和外面的判斷超時時間的時間戳比較呢?
因為是分布式的環境下,可以在前一個鎖失效的時候,有兩個進程進入到鎖超時的判斷。如:
C0超時了,還持有鎖,C1/C2同時請求進入了方法里面
C1/C2獲取到了C0的超時時間
C1使用getSet方法
C2也執行了getSet方法
假如我們不加 oldValueStr.equals(currentValueStr) 的判斷,將會C1/C2都將獲得鎖,加了之后,能保證C1和C2只能一個能獲得鎖,一個只能繼續等待。
注意:這里可能導致超時時間不是其原本的超時時間,C1的超時時間可能被C2覆蓋了,但是他們相差的毫秒及其小,這里忽略了。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:http://www.cnblogs.com/0201zcr/p/5942748.html