當你有多個進程或線程訪問相同的數據時,競爭條件是一個威脅。本文探討了在發現競爭條件后如何測試它們。
Incrmnt
你在一個名為“Incrmnt”的火熱新創公司工作,該公司只做一件事情,并且做得比較好。
你展示一個全局計數器和一個加號,用戶可以點擊加號,此時計數器加一。這太簡單了,而且容易使人上癮。毫無疑問這就是接下來的大事情。
投資者們爭先恐后的進入了董事會,但你有一個大問題。
競爭條件
在你的內測中,Abraham和Belinda是如此的興奮,以至于每個人都點了100次加號按鈕。你的服務器日志顯示了200次請求,但計數器卻顯示為173。很明顯,有一些請求沒有被加上。
先將“Incrmnt變成了一坨屎”的新聞拋到腦后,你檢查下代碼(本文用到的所有代碼都能在Github上找到)。
1
2
3
4
5
6
7
8
9
10
|
# incrmnt.py import db def increment(): count = db.get_count() new_count = count + 1 db.set_count(new_count) return new_count |
你的Web服務器使用多進程處理流量請求,所以這個函數能在不同的線程中同時執行。如果你沒掌握好時機,將會發生:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
# 線程1和線程2在不同的進程中同時執行 # 為了展示的目的,在這里并排放置 # 在垂直方向分開它們,以說明在每個時間點上執行什么代碼 # Thread 1(線程1) # Thread 2(線程2) def increment(): def increment(): # get_count returns 0 count = db.get_count() # get_count returns 0 again count = db.get_count() new_count = count + 1 # set_count called with 1 db.set_count(new_count) new_count = count + 1 # set_count called with 1 again db.set_count(new_count) |
所以盡管增加了兩次計數,但最終只增加了1。
你知道你可以修改這個代碼,變為線程安全的,但是在你那么做之前,你還想寫一個測試證明競爭的存在。
重現競爭
在理想情況下,測試應該盡可能的重現上面的場景。競爭的關鍵因素是:
?兩個 get_count 調用必須在兩個 set_count 調用之前執行,從而使得兩個線程中的計數具有相同的值。
set_count 調用,什么時候執行都沒關系,只要它們都在 get_count 調用之后即可。
簡單起見,我們試著重現這個嵌套的情形。這里整 個Thread 2 在 Thread 1 的首個 get_count 調用之后執行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# Thread 1 # Thread 2 def increment(): # get_count returns 0 count = db.get_count() def increment(): # get_count returns 0 again count = db.get_count() # set_count called with 1 new_count = count + 1 db.set_count(new_count) # set_count called with 1 again new_count = count + 1 db.set_count(new_count) |
before_after 是一個庫,它提供了幫助重現這種情形的工具。它可以在一個函數之前或之后插入任意代碼。
before_after 依賴于 mock 庫,它用來補充一些功能。如果你不熟悉 mock,我建議閱讀一些優秀的文檔。文檔中特別重要的部分是 Where To Patch。
我們希望,Thread 1 調用 get_count 后,執行全部的 Thread 2 ,之后恢復執行 Thread 1。
我們的測試代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
# test_incrmnt.py import unittest import before_after import db import incrmnt class TestIncrmnt(unittest.TestCase): def setUp( self ): db.reset_db() def test_increment_race( self ): # after a call to get_count, call increment with before_after.after( 'incrmnt.db.get_count' , incrmnt.increment): # start off the race with a call to increment incrmnt.increment() count = db.get_count() self .assertEqual(count, 2 ) |
在首次 get_count 調用之后,我們使用 before_after 的上下文管理器 after 來插入另外一個 increment 的調用。
在默認情況下,before_after只調用一次 after 函數。在這個特殊的情況下這是很有用的,因為否則的話堆棧會溢出(increment調用get_count,get_coun t也調用 increment,increment 又調用get_count…)。
這個測試失敗了,因為計數等于1,而不是2?,F在我們有一個重現了競爭條件的失敗測試,一起來修復。
防止競爭
我們將要使用一個簡單的鎖機制來減緩競爭。這顯然不是理想的解決方案,更好的解決方法是使用原子更新進行數據存儲——但這種方法能更好地示范 before_after 在測試多線程應用程序上的作用。
在 incrmnt.py 中添加一個新函數:
1
2
3
4
5
|
# incrmnt.py def locking_increment(): with db.get_lock(): return increment() |
它保證在同一時間只有一個線程對計數進行讀寫操作。如果一個線程試圖獲取鎖,而鎖被另外一個線程保持,將會引發 CouldNotLock 異常。
現在我們增加這樣一個測試:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# test_incrmnt.py def test_locking_increment_race( self ): def erroring_locking_increment(): # Trying to get a lock when the other thread has it will cause a # CouldNotLock exception - catch it here or the test will fail with self .assertRaises(db.CouldNotLock): incrmnt.locking_increment() with before_after.after( 'incrmnt.db.get_count' , erroring_locking_increment): incrmnt.locking_increment() count = db.get_count() self .assertEqual(count, 1 ) |
現在在同一時間,就只有一個線程能夠增加計數了。
減緩競爭
我們這里還有一個問題,通過上邊這種方式,如果兩個請求沖突,一個不會被登記。為了緩解這個問題,我們可以讓 increment 重新鏈接服務器(有一個簡潔的方式,就是用類似 funcy retry 的東西):
1
2
3
4
5
6
7
8
|
# incrmnt.py def retrying_locking_increment(): @retry (tries = 5 , errors = db.CouldNotLock) def _increment(): return locking_increment() return _increment() |
當我們需要比這種方法提供的更大規模的操作時,可以將 increment 作為一個原子更新或事務轉移到我們的數據庫中,讓其在遠離我們的應用程序的地方承擔責任。
總結
Incrmnt 現在不存在競爭了,人們可以愉快地點擊一整天,而不用擔心自己不被計算在內。
這是一個簡單的例子,但是 before_after 可以用于更復雜的競爭條件,以確保你的函數能正確地處理所有情形。能夠在單線程環境中測試和重現競爭條件是一個關鍵,它能讓你更確定你正在正確地處理競爭條件。