前言
和網絡 IO 一樣,文件讀寫同樣是一個費事的操作。
默認情況下,Python 使用的是系統的阻塞讀寫。這意味著在 asyncio 中如果調用了
1
2
|
f = file ( 'xx' ) f.read() |
會阻塞事件循環。
本篇簡述如何用 asyncio.Future 對象來封裝文件的異步讀寫。
代碼在 GitHub。目前僅支持 Linux。
阻塞和非阻塞
首先需要將文件的讀寫改為非阻塞的形式。在非阻塞情況下,每次調用 read 都會立即返回,如果返回值為空,則意味著文件操作還未完成,反之則是讀取的文件內容。
阻塞和非阻塞的切換與操作系統有關,所以本篇暫時只寫了 Linux 版本。如果有過 Unix 系統編程經驗,會發現 Python 的操作是類似的。
1
2
3
|
flag = fcntl.fcntl( self .fd, fcntl.F_GETFL) if fcntl.fcntl( self .fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) ! = 0 : raise OSError() |
Future 對象
Future 對象類似 Javascript 中的 Promise 對象。它是一個占位符,其值會在將來被計算出來。我們可以使用
result = await future
在 future 得到值之后返回。而使用
future.set_result(xxx)
就可以設置 future 的值,也意味著 future 可以被返回了。await 操作符會自動調用 future.result() 來得到值。
loop.call_soon
通過 loop.call_soon 方法可以將一個函數插入到事件循環中。
至此,我們的異步文件讀寫思路也就出來了。通過 loop.call_soon 調用非阻塞讀寫文件的函數。若一次文件讀寫沒有完成,則計算剩余所學讀寫的字節數,并再次插入事件循環直至讀寫完畢。
可以發現其就是把傳統 Unix 編程里,非阻塞文件讀寫的 while 循環換成了 asyncio 的事件循環。
下面是這一過程的示意代碼。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
def read_step( self , future, n, total): res = self .fd.read(n) if res is None : self .loop.call_soon( self .read_step, future, n, total) return if not res: # EOF future.set_result(bytes( self .rbuffer)) return self .rbuffer.extend(res) self .loop.call_soon( self .read_step, future, self .BLOCK_SIZE, total) def read( self , n = - 1 ): future = asyncio.Future(loop = self .loop) self .rbuffer.clear() self .loop.call_soon( self .read_step, future, min ( self .BLOCK_SIZE, n), n) return future |