最近做一個(gè)小程序開發(fā)任務(wù),主要負(fù)責(zé)后臺部分開發(fā);根據(jù)項(xiàng)目需求,需要實(shí)現(xiàn)三個(gè)定時(shí)任務(wù):
1>定時(shí)更新微信token,需要2小時(shí)更新一次;
2>商品定時(shí)上線;
3>定時(shí)檢測后臺服務(wù)是否存活;
使用python去實(shí)現(xiàn)這三個(gè)任務(wù),這里需要使用定時(shí)相關(guān)知識點(diǎn);
python實(shí)現(xiàn)定點(diǎn)與定時(shí)任務(wù)方式比較多,找到下面四中實(shí)現(xiàn)方式,每個(gè)方式都有自己應(yīng)用場景;下面來快速介紹python中常用的定時(shí)任務(wù)實(shí)現(xiàn)方式:
1>循環(huán)+sleep;
2>線程模塊中timer類;
3>schedule模塊;
4>定時(shí)框架:apscheduler
在開始之前先設(shè)定一個(gè)任務(wù)(這樣不用依賴外部環(huán)境):
1:定時(shí)或者定點(diǎn)監(jiān)測cpu與內(nèi)存使用率;
2:將時(shí)間,cpu,內(nèi)存使用情況保存到日志文件;
先來實(shí)現(xiàn)系統(tǒng)監(jiān)測功能:
準(zhǔn)備工作:安裝psutil:pip install psutil
功能實(shí)現(xiàn)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
#psutil:獲取系統(tǒng)信息模塊,可以獲取cpu,內(nèi)存,磁盤等的使用情況 import psutil import time import datetime #logfile:監(jiān)測信息寫入文件 def monitorsystem(logfile = none): #獲取cpu使用情況 cpuper = psutil.cpu_percent() #獲取內(nèi)存使用情況:系統(tǒng)內(nèi)存大小,使用內(nèi)存,有效內(nèi)存,內(nèi)存使用率 mem = psutil.virtual_memory() #內(nèi)存使用率 memper = mem.percent #獲取當(dāng)前時(shí)間 now = datetime.datetime.now() ts = now.strftime( '%y-%m-%d %h:%m:%s' ) line = f '{ts} cpu:{cpuper}%, mem:{memper}%' print (line) if logfile: logfile.write(line) |
代碼運(yùn)行結(jié)果:
2019-03-21 14:23:41 cpu:0.6%, mem:77.2%
接下來我們要實(shí)現(xiàn)定時(shí)監(jiān)測,比如3s監(jiān)測一下系統(tǒng)資源使用情況。
最簡單使用方式:sleep
這種方式最簡單,直接使用while+sleep就可以實(shí)現(xiàn):
1
2
3
4
5
6
|
def loopmonitor(): while true: monitorsystem() #2s檢查一次 time.sleep( 3 ) loopmonitor() |
輸出結(jié)果:
2019-03-21 14:28:42 cpu:1.5%, mem:77.6%
2019-03-21 14:28:45 cpu:1.6%, mem:77.6%
2019-03-21 14:28:48 cpu:1.4%, mem:77.6%
2019-03-21 14:28:51 cpu:1.4%, mem:77.6%
2019-03-21 14:28:54 cpu:1.3%, mem:77.6%
這種方式存在問題:只能處理單個(gè)定時(shí)任務(wù)。
如果你依然在編程的世界里迷茫,不知道自己的未來規(guī)劃
自己是一名高級python開發(fā)工程師,從基礎(chǔ)的python腳本到web開發(fā)、爬蟲、django、數(shù)據(jù)挖掘等,零基礎(chǔ)到項(xiàng)目實(shí)戰(zhàn)的資料都有整理。送給每一位python的小伙伴!分享一些學(xué)習(xí)的方法和需要注意的小細(xì)節(jié)
又來了新任務(wù):需要每秒監(jiān)測網(wǎng)絡(luò)收發(fā)字節(jié),代碼實(shí)現(xiàn)如下:
1
2
3
4
5
6
7
8
9
10
11
|
def monitornetwork(logfile = none): #獲取網(wǎng)絡(luò)收信息 netinfo = psutil.net_io_counters() #獲取當(dāng)前時(shí)間 now = datetime.datetime.now() ts = now.strftime( '%y-%m-%d %h:%m:%s' ) line = f '{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}' print (line) if logfile: logfile.write(line) monitornetwork() |
代碼執(zhí)行結(jié)果:
2019-03-21 14:47:21 bytessent=169752183, bytesrecv=1107900973
如果我們同時(shí)在while循環(huán)中監(jiān)測兩個(gè)任務(wù)會有等待問題,不能每秒監(jiān)測網(wǎng)絡(luò)情況。
timer實(shí)現(xiàn)方式
timer最基本理解就是定時(shí)器,我們可以啟動(dòng)多個(gè)定時(shí)任務(wù),這些定時(shí)器任務(wù)是異步執(zhí)行,所以不存在等待順序執(zhí)行問題。
先來看timer的基本使用:
導(dǎo)入:from threading import timer
主要方法:
timer方法 | 說明 |
---|---|
timer(interval, function, args=none, kwargs=none) | 創(chuàng)建定時(shí)器 |
cancel() | 取消定時(shí)器 |
start() | 使用線程方式執(zhí)行 |
join(self, timeout=none) | 等待線程執(zhí)行結(jié)束 |
定時(shí)器只能執(zhí)行一次,如果需要重復(fù)執(zhí)行,需要重新添加任務(wù);
我們先來看基本使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from threading import timer #記錄當(dāng)前時(shí)間 print (datetime.datetime.now()) #3s執(zhí)行一次 stimer = timer( 3 , monitorsystem) #1s執(zhí)行一次 ntimer = timer( 1 , monitornetwork) #使用線程方式執(zhí)行 stimer.start() ntimer.start() #等待結(jié)束 stimer.join() ntimer.join() #記錄結(jié)束時(shí)間 print (datetime.datetime.now()) |
輸出結(jié)果:
2019-03-21 15:13:36.739798
2019-03-21 15:13:37 bytessent=171337324, bytesrecv=1109002349
2019-03-21 15:13:39 cpu:1.4%, mem:93.2%
2019-03-21 15:13:39.745187
可以看到,花費(fèi)時(shí)間為3s,但是我們想要做的是每秒監(jiān)控網(wǎng)絡(luò)狀態(tài);如何處理。
timer只能執(zhí)行一次,所以執(zhí)行完成之后需要再次添加任務(wù),我們對代碼進(jìn)行修改:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
from threading import timer import psutil import time import datetime def monitorsystem(logfile = none): cpuper = psutil.cpu_percent() mem = psutil.virtual_memory() memper = mem.percent now = datetime.datetime.now() ts = now.strftime( '%y-%m-%d %h:%m:%s' ) line = f '{ts} cpu:{cpuper}%, mem:{memper}%' print (line) if logfile: logfile.write(line) #啟動(dòng)定時(shí)器任務(wù),每三秒執(zhí)行一次 timer( 3 , monitorsystem).start() def monitornetwork(logfile = none): netinfo = psutil.net_io_counters() now = datetime.datetime.now() ts = now.strftime( '%y-%m-%d %h:%m:%s' ) line = f '{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}' print (line) if logfile: logfile.write(line) #啟動(dòng)定時(shí)器任務(wù),每秒執(zhí)行一次 timer( 1 , monitornetwork).start() monitorsystem() monitornetwork() |
執(zhí)行結(jié)果:
2019-03-21 15:18:21 cpu:1.5%, mem:93.2%
2019-03-21 15:18:21 bytessent=171376522, bytesrecv=1109124678
2019-03-21 15:18:22 bytessent=171382215, bytesrecv=1109128294
2019-03-21 15:18:23 bytessent=171384278, bytesrecv=1109129702
2019-03-21 15:18:24 cpu:1.9%, mem:93.2%
2019-03-21 15:18:24 bytessent=171386341, bytesrecv=1109131110
2019-03-21 15:18:25 bytessent=171388527, bytesrecv=1109132600
2019-03-21 15:18:26 bytessent=171390590, bytesrecv=1109134008
從時(shí)間中可以看到,這兩個(gè)任務(wù)可以同時(shí)進(jìn)行不存在等待問題。
timer的實(shí)質(zhì)是使用線程方式去執(zhí)行任務(wù),每次執(zhí)行完后會銷毀,所以不必?fù)?dān)心資源問題。
調(diào)度模塊:schedule
schedule是一個(gè)第三方輕量級的任務(wù)調(diào)度模塊,可以按照秒,分,小時(shí),日期或者自定義事件執(zhí)行時(shí)間;
安裝方式:
pip install schedule
我們來看一個(gè)例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import datetime import schedule import time def func(): now = datetime.datetime.now() ts = now.strftime( '%y-%m-%d %h:%m:%s' ) print ( 'do func time :' ,ts) def func2(): now = datetime.datetime.now() ts = now.strftime( '%y-%m-%d %h:%m:%s' ) print ( 'do func2 time:' ,ts) def tasklist(): #清空任務(wù) schedule.clear() #創(chuàng)建一個(gè)按秒間隔執(zhí)行任務(wù) schedule.every( 1 ).seconds.do(func) #創(chuàng)建一個(gè)按2秒間隔執(zhí)行任務(wù) schedule.every( 2 ).seconds.do(func2) #執(zhí)行10s for i in range ( 10 ): schedule.run_pending() time.sleep( 1 ) tasklist() |
執(zhí)行結(jié)果:
do func time : 2019-03-22 08:51:38
do func2 time: 2019-03-22 08:51:39
do func time : 2019-03-22 08:51:39
do func time : 2019-03-22 08:51:40
do func2 time: 2019-03-22 08:51:41
do func time : 2019-03-22 08:51:41
do func time : 2019-03-22 08:51:42
do func2 time: 2019-03-22 08:51:43
do func time : 2019-03-22 08:51:43
do func time : 2019-03-22 08:51:44
do func2 time: 2019-03-22 08:51:45
do func time : 2019-03-22 08:51:45
do func time : 2019-03-22 08:51:46
執(zhí)行過程分析:
>1>因?yàn)樵趈upyter下執(zhí)行,所以先將schedule任務(wù)清空;
>2>按時(shí)間間在schedule中隔添加任務(wù);
>3>這里按照秒間隔添加func,按照兩秒間隔添加func2;
>4>schedule添加任務(wù)后,需要查詢?nèi)蝿?wù)并執(zhí)行任務(wù);
>5>為了防止占用資源,每秒查詢到點(diǎn)任務(wù),然后順序執(zhí)行;
第5個(gè)順序執(zhí)行怎么理解,我們修改func函數(shù),里面添加time.sleep(2)
然后只執(zhí)行func工作,輸出結(jié)果:
do func time : 2019-03-22 09:00:59
do func time : 2019-03-22 09:01:02
do func time : 2019-03-22 09:01:05
可以看到時(shí)間間隔為3s,為什么不是1s?
因?yàn)檫@個(gè)按照順序執(zhí)行,func休眠2s,循環(huán)任務(wù)查詢休眠1s,所以會存在這個(gè)問題。
在我們使用這種方式執(zhí)行任務(wù)需要注意這種阻塞現(xiàn)象。
我們看下schedule模塊常用使用方法:
1
2
3
4
5
6
7
8
9
10
11
12
|
#schedule.every(1)創(chuàng)建job, seconds.do(func)按秒間隔查詢并執(zhí)行 schedule.every( 1 ).seconds.do(func) #添加任務(wù)按分執(zhí)行 schedule.every( 1 ).minutes.do(func) #添加任務(wù)按天執(zhí)行 schedule.every( 1 ).days.do(func) #添加任務(wù)按周執(zhí)行 schedule.every().weeks.do(func) #添加任務(wù)每周1執(zhí)行,執(zhí)行時(shí)間為下周一這一時(shí)刻時(shí)間 schedule.every().monday.do(func) #每周1,1點(diǎn)15開始執(zhí)行 schedule.every().monday.at( "12:00" ).do(job) |
這種方式局限性:如果工作任務(wù)回非常耗時(shí)就會影響其他任務(wù)執(zhí)行。我們可以考慮使用并發(fā)機(jī)制配置這個(gè)模塊使用。
任務(wù)框架apscheduler
apscheduler是python的一個(gè)定時(shí)任務(wù)框架,用于執(zhí)行周期或者定時(shí)任務(wù),
可以基于日期、時(shí)間間隔,及類似于linux上的定時(shí)任務(wù)crontab類型的定時(shí)任務(wù);
該該框架不僅可以添加、刪除定時(shí)任務(wù),還可以將任務(wù)存儲到數(shù)據(jù)庫中,實(shí)現(xiàn)任務(wù)的持久化,使用起來非常方便。
安裝方式:pip install apscheduler
apscheduler組件及簡單說明:
1>triggers(觸發(fā)器):觸發(fā)器包含調(diào)度邏輯,每一個(gè)作業(yè)有它自己的觸發(fā)器
2>job stores(作業(yè)存儲):用來存儲被調(diào)度的作業(yè),默認(rèn)的作業(yè)存儲器是簡單地把作業(yè)任務(wù)保存在內(nèi)存中,支持存儲到mongodb,redis數(shù)據(jù)庫中
3> executors(執(zhí)行器):執(zhí)行器用來執(zhí)行定時(shí)任務(wù),只是將需要執(zhí)行的任務(wù)放在新的線程或者線程池中運(yùn)行
4>schedulers(調(diào)度器):調(diào)度器是將其它部分聯(lián)系在一起,對使用者提供接口,進(jìn)行任務(wù)添加,設(shè)置,刪除。
來看一個(gè)簡單例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import time from apscheduler.schedulers.blocking import blockingscheduler def func(): now = datetime.datetime.now() ts = now.strftime( '%y-%m-%d %h:%m:%s' ) print ( 'do func time :' ,ts) def func2(): #耗時(shí)2s now = datetime.datetime.now() ts = now.strftime( '%y-%m-%d %h:%m:%s' ) print ( 'do func2 time:' ,ts) time.sleep( 2 ) def dojob(): #創(chuàng)建調(diào)度器:blockingscheduler scheduler = blockingscheduler() #添加任務(wù),時(shí)間間隔2s scheduler.add_job(func, 'interval' , seconds = 2 , id = 'test_job1' ) #添加任務(wù),時(shí)間間隔5s scheduler.add_job(func2, 'interval' , seconds = 3 , id = 'test_job2' ) scheduler.start() dojob() |
輸出結(jié)果:
do func time : 2019-03-22 10:32:20
do func2 time: 2019-03-22 10:32:21
do func time : 2019-03-22 10:32:22
do func time : 2019-03-22 10:32:24
do func2 time: 2019-03-22 10:32:24
do func time : 2019-03-22 10:32:26
輸出結(jié)果中可以看到:任務(wù)就算是有延時(shí),也不會影響其他任務(wù)執(zhí)行。
apscheduler框架提供豐富接口去實(shí)現(xiàn)定時(shí)任務(wù),可以去參考官方文檔去查看使用方式。
最后選擇:
簡單總結(jié)上面四種定時(shí)定點(diǎn)任務(wù)實(shí)現(xiàn):
1:循環(huán)+sleep方式適合簡答測試,
2:timer可以實(shí)現(xiàn)定時(shí)任務(wù),但是對定點(diǎn)任務(wù)來說,需要檢查當(dāng)前時(shí)間點(diǎn);
3:schedule可以定點(diǎn)定時(shí)執(zhí)行,但是需要在循環(huán)中檢測任務(wù),而且存在阻塞;
4:apscheduler框架更加強(qiáng)大,可以直接在里面添加定點(diǎn)與定時(shí)任務(wù);
綜合考慮,決定使用apscheduler框架,實(shí)現(xiàn)簡單,只需要直接創(chuàng)建任務(wù),并將添加到調(diào)度器中即可。
總結(jié)
以上所述是小編給大家介紹的python3實(shí)現(xiàn)定時(shí)任務(wù)的四種方式,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時(shí)回復(fù)大家的。在此也非常感謝大家對服務(wù)器之家網(wǎng)站的支持!
如果你覺得本文對你有幫助,歡迎轉(zhuǎn)載,煩請注明出處,謝謝!原文鏈接:https://www.jianshu.com/p/1425977657eb