為了避免由于一些網絡或等其他不可控因素,而引起的功能性問題。比如在發送請求時,會因為網絡不穩定,往往會有請求超時的問題。
這種情況下,我們通常會在代碼中加入重試的代碼。重試的代碼本身不難實現,但如何寫得優雅、易用,是我們要考慮的問題。
這里要給大家介紹的是一個第三方庫-Tenacity(標題中的重試機制并并不準確,它不是Python的內置模塊,因此并不能稱之為機制),它實現了幾乎我們可以使用到的所有重試場景,比如:
·在什么情況下才進行重試?
·重試幾次呢?
·重試多久后結束?
·每次重試的間隔多長呢?
·重試失敗后的回調?
在使用它之前,先要安裝它
$pipinstalltenacity
1.最基本的重試
無條件重試,重試之間無間隔
fromtenacityimportretry
@retry
deftest_retry():
print("等待重試,重試無間隔執行...")
raiseException
test_retry()
無條件重試,但是在重試之前要等待2秒
fromtenacityimportretry,wait_fixed
@retry(wait=wait_fixed(2))
deftest_retry():
print("等待重試...")
raiseException
test_retry()
2.設置停止基本條件
只重試7次
fromtenacityimportretry,stop_after_attempt
@retry(stop=stop_after_attempt(7))
deftest_retry():
print("等待重試...")
raiseException
test_retry()
重試10秒后不再重試
fromtenacityimportretry,stop_after_delay
@retry(stop=stop_after_delay(10))
deftest_retry():
print("等待重試...")
raiseException
test_retry()
或者上面兩個條件滿足一個就結束重試
fromtenacityimportretry,stop_after_delay,stop_after_attempt
@retry(stop=(stop_after_delay(10)|stop_after_attempt(7)))
deftest_retry():
print("等待重試...")
raiseException
test_retry()
3.設置何時進行重試
在出現特定錯誤/異常(比如請求超時)的情況下,再進行重試
fromrequestsimportexceptions
fromtenacityimportretry,retry_if_exception_type
@retry(retry=retry_if_exception_type(exceptions.Timeout))
deftest_retry():
print("等待重試...")
raiseexceptions.Timeout
test_retry()
在滿足自定義條件時,再進行重試。
如下示例,當test_retry函數返回值為False時,再進行重試
fromtenacityimportretry,stop_after_attempt,retry_if_result
defis_false(value):
returnvalueisFalse
@retry(stop=stop_after_attempt(3),
retry=retry_if_result(is_false))
deftest_retry():
returnFalse
test_retry()
4.重試后錯誤重新拋出
當出現異常后,tenacity會進行重試,若重試后還是失敗,默認情況下,往上拋出的異常會變成RetryError,而不是最根本的原因。
因此可以加一個參數(reraise=True),使得當重試失敗后,往外拋出的異常還是原來的那個。
fromtenacityimportretry,stop_after_attempt
@retry(stop=stop_after_attempt(7),reraise=True)
deftest_retry():
print("等待重試...")
raiseException
test_retry()
5.設置回調函數
當最后一次重試失敗后,可以執行一個回調函數
fromtenacityimport*
defreturn_last_value(retry_state):
print("執行回調函數")
returnretry_state.outcome.result()#表示返回原函數的返回值
defis_false(value):
returnvalueisFalse
@retry(stop=stop_after_attempt(3),
retry_error_callback=return_last_value,
retry=retry_if_result(is_false))
deftest_retry():
print("等待重試中...")
returnFalse
print(test_retry())
輸出如下
等待重試中...
等待重試中...
等待重試中...
執行回調函數
False
以上內容為大家介紹了Python的重試機制,希望對大家有所幫助,如果想要了解更多Python相關知識,請關注IT培訓機構:千鋒教育。http://www.kei0345678.cn/