Python中怎么利用backoff实现轮询

发布时间:2021-08-07 16:03:11 作者:Leah
来源:亿速云 阅读:142

本篇文章给大家分享的是有关Python中怎么利用backoff实现轮询,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

backoff 模块简介及安装

这个模块主要提供了是一个装饰器,用于装饰函数,使得它在遇到某些条件时会重试(即反复执行被装饰的函数)。通常适用于我们在获取一些不可靠资源,比如会间歇性故障的资源等。

此外,装饰器支持正常的同步方法,也支持异步asyncio代码。

backoff 模块的安装也很简单,通过 pip 即可安装完成:

pip install backoff

backoff 用法及简单源码分析

backoff 提供两个主要的装饰器,通过 backoff. 调用,通过提示我们可以看到这两个装饰器,分别是:

backoff.on_predicatebackoff.on_exception

通过 github 查看 backoff 的源码,源码目录 backoff/_decorator.py,定义如下:

def on_predicate(wait_gen, predicate=operator.not_, max_tries=None, max_time=None, jitter=full_jitter, on_success=None, on_backoff=None, on_giveup=None, logger='backoff',
 **wait_gen_kwargs):
 # 省略具体代码
 # 每个参数的定义在源码中都给出了明确的解释
 pass
def on_exception(wait_gen,
 exception, max_tries=None, max_time=None, jitter=full_jitter, giveup=lambda e: False, on_success=None, on_backoff=None, on_giveup=None, logger='backoff',
 **wait_gen_kwargs):
 # 省略具体代码
 # 每个参数的定义在源码中都给出了明确的解释
 pass

Python中怎么利用backoff实现轮询

可以看到,定义了很多的参数,这些参数在源码中都给出了比较详细的解释,这里做简单的介绍:

首先,wait_gen:表示每次循环等待的时长,以秒为单位。它的类型是一个生成器,在 backoff 中内置了三个生成器。我们查看下源码,目录为 backoff/_wait_gen.py。我们取其中一个的详细实现来看下:

# 省略实现代码# base * factor * ndef expo(base=2, factor=1, max_value=None):
 """Generator for exponential decay.
 Args:
 base: The mathematical base of the exponentiation operation
 factor: Factor to multiply the exponentation by.
 max_value: The maximum value to yield. Once the value in the
 true exponential sequence exceeds this, the value
 of max_value will forever after be yielded.
 """
 n = 0
 while True:
 a = factor * base ** n if max_value is None or a < max_value: yield a
 n += 1
 else: yield max_value# 通过斐波那契数列控制def fibo(max_value=None):
 pass# 常量数值def constant(interval=1):
 pass

从源码不难看出,通过一些策略,每次 yield 返回不同的数值,这些数值就是重试等待秒数。当然因为这个参数类型是生成器,显然我们也是可以自定义的。同时我们会发现每个 wait_gen 都是参数控制的,所以我们理应是可以修改这个参数的初始值的。

显然,wait_gen_kwargs就是用来传递这些参数的,它是通过可变关键字参数控制的,可以直接用 key=value 的形式进行传参,简单示例如下:

@backoff.on_predicate(backoff.constant, interval=5)def main3():
 print("time is {} retry...".format(time.time()))

predict 与 exception。这两个相对比较简单,predict 接受一个函数,当这个函数返回 True 时会进行重试,否则停止,同时这个函数接受一个参数,这个参数的值是被装饰函数的返回值。这个参数的默认值是:operator._not。这个函数的源码如下:

def not_(a): "Same as not a."
 return not a

所以默认返回的是 not 被装饰函数的返回值。如果当被装饰函数并没有返回值时,返回 True,会进行重试。

示例代码如下:

import backoffimport time@backoff.on_predicate(backoff.fibo)def test2():
 print("time is {}, retry...".format(time.time()))if __name__ == "__main__":
 test2()# 等价于:# 必须接受一个参数,这个参数的值是被装饰函数的返回值def condition(r):
 return True
 @backoff.on_predicate(backoff.fibo, condition)def test2():
 print("time is {}, retry...".format(time.time()))if __name__ == "__main__":
 test2()

执行结果如下:

$ python3 backoff_test.pytime is 1571801845.834578, retry...time is 1571801846.121314, retry...time is 1571801846.229812, retry...time is 1571801846.533237, retry...time is 1571801849.460303, retry...time is 1571801850.8974788, retry...time is 1571801856.498335, retry...time is 1571801861.56931, retry...time is 1571801872.701226, retry...time is 1571801879.198495, retry...
...

需要注意几点:

而 exception 则是接受异常类型的实例,可以是单个异常,也可以是元组形式的多个异常。简单示例如下:

import timeimport randomimport backofffrom collections import dequeclass MyException(Exception):
 def __init__(self, message, status):
 super().__init__(message, status)
 self.message = message
 self.status = statusclass MyException2(Exception):
 pass@backoff.on_exception(backoff.expo, (MyException, MyException2))def main():
 random_num = random.randint(0, 9)
 print("retry...and random num is {}".format(random_num)) if random_num % 2 == 0: raise MyException("my exception", int("1000" + str(random_num))) raise MyException2()

max_tries 与 max_time 也比较简单,分别代表最大重试次数与最长重试时间。这里就不做演示了。

@backoff.on_exception 中的 giveup,它接受一个异常实例,通过对这个实例做一些条件判断,达到判断是否需要继续循环的目的。如果返回 True,则结束,反之继续。默认值一直是返回 False,即会一直循环。示例如下:

import randomimport backoffclass MyException(Exception):
 def __init__(self, message, status):
 super().__init__(message, status)
 self.message = message
 self.status = statusdef exception_status(e):
 print('exception status code is {}'.format(e.status)) return e.status % 2 == 0
 @backoff.on_exception(backoff.expo, MyException, giveup=exception_status)def main():
 random_num = random.randint(0, 9)
 print("retry...and random num is {}".format(random_num)) raise MyException("my exception", int("1000" + str(random_num)))if __name__ == "__main__":
 main()

运行结果:

retry...and random num is 5exception status code is 10005retry...and random num is 0exception status code is 10000# 会再走一遍 raise 的代码,所以异常仍然会抛出来
Traceback (most recent call last):
 File "backoff_test.py", line 36, in <module>
 main()
 File "/Users/ruoru/code/python/exercise/.venv/lib/python3.7/site-packages/backoff/_sync.py", line 94, in retry
 ret = target(*args, **kwargs)
 File "backoff_test.py", line 32, in main
 raise MyException("my exception", int("1000" + str(random_num)))
__main__.MyException: ('my exception', 10000)

需要注意两点:

on_success、on_backoff 与 on_giveup 这三个是一类的参数,用于做事件处理:

总结来说,max_tries 和 max_time 这种直接控制结束的,调用的是 on_giveup,而 exception 参数也是通过返回 True 则程序就结束,它是用来控制程序结束的,所以也会调用 on_giveup。而 predicate 参数返回 True 则程序继续,它是用来控制程序是否继续徨的,所以当它结束时,调用的是 on_success。

实验代码如下:

'''
@Author: ruoru
@Date: 2019-10-22 15:30:32
@LastEditors: ruoru
@LastEditTime: 2019-10-23 14:37:13
@Description: backoff
'''import timeimport randomimport backoffclass MyException(Exception):
 def __init__(self, status, message):
 super().__init__(status, message)
 self.status = status
 self.message = messagedef backoff_hdlr(details):
 print("Backing off {wait:0.1f} seconds afters {tries} tries "
 "calling function {target} with args {args} and kwargs "
 "{kwargs}".format(**details))def success_hdlr(details):
 print("Success offafters {tries} tries "
 "calling function {target} with args {args} and kwargs "
 "{kwargs}".format(**details))def giveup_hdlr(details):
 print("Giveup off {tries} tries "
 "calling function {target} with args {args} and kwargs "
 "{kwargs}".format(**details))@backoff.on_predicate(
 backoff.constant, # 当 random num 不等 10009 则继续
 # 当 random_num 等于 10009 后,会调用 on_success
 lambda x: x != 10009,
 on_success=success_hdlr,
 on_backoff=backoff_hdlr,
 on_giveup=giveup_hdlr,
 max_time=2)def main():
 num = random.randint(10000, 10010)
 print("time is {}, num is {}, retry...".format(time.time(), num)) return num@backoff.on_exception(
 backoff.constant,
 MyException, # 当 Exception 实例对象的 status 为 10009 成立时退出
 # 当条件成立时,调用的是 on_giveup
 giveup=lambda e: e.status == 10009,
 on_success=success_hdlr,
 on_backoff=backoff_hdlr,
 on_giveup=giveup_hdlr,
 )def main2():
 num = random.randint(10000, 10010)
 print("time is {}, num is {}, retry...".format(time.time(), num)) # 如果是通过这个条件成立退出,调用的是 on_success
 if num == 10010: return
 raise MyException(num, "hhh")if __name__ == "__main__": #main()
 main2()

logger 参数,很显然就是用来控制日志输出的,这里不做详细介绍。copy 官方文档的一个示例:

my_logger = logging.getLogger('my_logger')
my_handler = logging.StreamHandler()
my_logger.add_handler(my_handler)
my_logger.setLevel(logging.ERROR)
@backoff.on_exception(backoff.expo,
 requests.exception.RequestException,
 logger=my_logger)# ...

最后一个参数,jitter,开始也不是很明白这个参数的作用,文档的解释如下:

jitter: A function of the value yielded by wait_gen returning the actual time to wait. This distributes wait times stochastically in order to avoid timing collisions across concurrent clients. Wait times are jittered by default using the full_jitter function. Jittering may be disabled altogether by passing jitter=None.

有点晕,于是去看了下源码,明白了用法,截取关键源码如下:

# backoff/_decorator.pydef on_predicate(wait_gen,
 predicate=operator.not_,
 max_tries=None,
 max_time=None,
 jitter=full_jitter,
 on_success=None,
 on_backoff=None,
 on_giveup=None,
 logger='backoff',
 **wait_gen_kwargs):
 pass # 省略
 # 因为没有用到异步,所以会进到这里
 if retry is None:
 retry = _sync.retry_predicate# backoff/_sync# 分析可以看到有一句获取下次 wait 时长seconds = _next_wait(wait, jitter, elapsed, max_time_)# backoff/_commondef _next_wait(wait, jitter, elapsed, max_time): value = next(wait) try: if jitter is not None: seconds = jitter(value) else: seconds = value
 except TypeError:
 warnings.warn( "Nullary jitter function signature is deprecated. Use "
 "unary signature accepting a wait value in seconds and "
 "returning a jittered version of it.",
 DeprecationWarning,
 stacklevel=2,
 ) seconds = value + jitter() # don't sleep longer than remaining alloted max_time
 if max_time is not None: seconds = min(seconds, max_time - elapsed) return seconds

看前面几行代码应该就会比较清晰了,如果 jitter 为 None,则会使用第一个参数返回的 value 值,而如果使用了,则会在这个 value 值上再做一次算法,默认为 full_jitter(value)。backoff/_jitter.py 提供了两个算法,代码不长,贴上来看看:

import randomdef random_jitter(value):
 """Jitter the value a random number of milliseconds.
 This adds up to 1 second of additional time to the original value.
 Prior to backoff version 1.2 this was the default jitter behavior.
 Args:
 value: The unadulterated backoff value.
 """
 return value + random.random()def full_jitter(value):
 """Jitter the value across the full range (0 to value).
 This corresponds to the "Full Jitter" algorithm specified in the
 AWS blog's post on the performance of various jitter algorithms.
 (http://www.awsarchitectureblog.com/2015/03/backoff.html)
 Args:
 value: The unadulterated backoff value.
 """
 return random.uniform(0, value)

以上就是Python中怎么利用backoff实现轮询,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。

推荐阅读:
  1. PHP怎么实现AJAX长轮询
  2. python轮询机制怎么控制led

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

python backoff

上一篇:python中怎么利用Dijkstra算法求最短路径

下一篇:如何解决某些HTML字符打不出来的问题

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》