大佬教程收集整理的这篇文章主要介绍了如何在Python中的函数中添加超时,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
这个问题是在9年前提出的,从那以后,Python和我的经验清单都发生了很大变化。在查看了标准库中的其他API并希望部分复制其中一个API之后,编写了follow模块以达到与问题中发布的API相似的目的。
#! /usr/bin/env python3
@R_502_5565@ _thread
@R_502_5565@ abc as _abc
@R_502_5565@ collections as _collections
@R_502_5565@ enum as _enum
@R_502_5565@ math as _math
@R_502_5565@ multiprocessing as _multiprocessing
@R_502_5565@ operator as _operator
@R_502_5565@ queue as _queue
@R_502_5565@ signal as _signal
@R_502_5565@ sys as _sys
@R_502_5565@ time as _time
__all__ = (
'Executor',
'get_timeout',
'set_timeout',
'submit',
'map_',
'shutdown'
)
class _Base(Metaclass=_abc.ABCMeta):
__slots__ = (
'__timeout',
)
@_abc.abstractmethod
def __init__(self, timeout):
self.timeout = _math.inf if timeout is None else timeout
def get_timeout(self):
return self.__timeout
def set_timeout(self, value):
if not isinstance(value, (float, int)):
raise TypeError('value must be of type float or int')
if value <= 0:
raise ValueError('value must be greater than zero')
self.__timeout = value
timeout = property(get_timeout, set_timeout)
def _run_and_catch(fn, args, kwargs):
# noinspection PyPep8,PybroadException
try:
return false, fn(*args, **kwargs)
except:
return True, _sys.exc_info()[1]
def _run(fn, args, kwargs, queuE):
queue.put_Nowait(_run_and_catch(fn, args, kwargs))
class _State(_enum.IntEnum):
PENDING = _enum.auto()
RUNNING = _enum.auto()
CANCELLED = _enum.auto()
FINISHED = _enum.auto()
ERROR = _enum.auto()
def _run_and_catch_loop(iterable, *args, **kwargs):
exception = None
for fn in iterable:
error, value = _run_and_catch(fn, args, kwargs)
if error:
exception = value
if exception:
raise exception
class _Future(_BasE):
__slots__ = (
'__queue',
'__process',
'__start_time',
'__callBACks',
'__result',
'__mutex'
)
def __init__(self, timeout, fn, args, kwargs):
super().__init__(timeout)
self.__queue = _multiprocessing.Queue(1)
self.__process = _multiprocessing.Process(
target=_run,
args=(fn, args, kwargs, self.__queuE),
daemon=True
)
self.__start_time = _math.inf
self.__callBACks = _collections.deque()
self.__result = True, TimeoutError()
self.__mutex = _thread.allocate_lock()
@property
def __state(self):
pID, exitcode = self.__process.pID, self.__process.exitcode
return (_State.PENDING if pID is None else
_State.RUNNING if exitcode is None else
_State.CANCELLED if exitcode == -_signal.SIGTERM else
_State.FINISHED if exitcode == 0 else
_State.ERROR)
def __repr__(self):
root = f'{type(self).__name__} at {ID(self)} state={self.__state.namE}'
if self.__state < _State.CANCELLED:
return f'<{root}>'
error, value = self.__result
suffix = f'{"raised" if error else "returned"} {type(value).__name__}'
return f'<{root} {suffix}>'
def __consume_callBACks(self):
while self.__callBACks:
yIEld self.__callBACks.popleft()
def __invoke_callBACks(self):
self.__process.join()
_run_and_catch_loop(self.__consume_callBACks(), self)
def cancel(self):
self.__process.terminate()
self.__invoke_callBACks()
def __auto_cancel(self):
elapsed_time = _time.perf_counter() - self.__start_time
if elapsed_time > self.timeout:
self.cancel()
return elapsed_time
def cancelled(self):
self.__auto_cancel()
return self.__state is _State.CANCELLED
def running(self):
self.__auto_cancel()
return self.__state is _State.RUNNING
def done(self):
self.__auto_cancel()
return self.__state > _State.RUNNING
def __handle_result(self, error, value):
self.__result = error, value
self.__invoke_callBACks()
def __ensure_termination(self):
with self.__mutex:
elapsed_time = self.__auto_cancel()
if not self.__queue.empty():
self.__handle_result(*self.__queue.get_Nowait())
elif self.__state < _State.CANCELLED:
remaining_time = self.timeout - elapsed_time
if remaining_time == _math.inf:
remaining_time = None
try:
result = self.__queue.get(True, remaining_timE)
except _queue.Empty:
self.cancel()
else:
self.__handle_result(*result)
def result(self):
self.__ensure_termination()
error, value = self.__result
if error:
raise value
return value
def exception(self):
self.__ensure_termination()
error, value = self.__result
if error:
return value
def add_done_callBACk(self, fn):
if self.done():
fn(self)
else:
self.__callBACks.append(fn)
def _set_running_or_notify_cancel(self):
if self.__state is _State.PENDING:
self.__process.start()
self.__start_time = _time.perf_counter()
else:
self.cancel()
class Executor(_BasE):
__slots__ = (
'__futures',
)
def __init__(self, timeout=NonE):
super().__init__(timeout)
self.__futures = set()
def submit(self, fn, *args, **kwargs):
future = _Future(self.timeout, fn, args, kwargs)
self.__futures.add(futurE)
future.add_done_callBACk(self.__futures.removE)
# noinspection PyProtectedMember
future._set_running_or_notify_cancel()
return future
@staticmethod
def __cancel_futures(iterablE):
_run_and_catch_loop(map(_operator.attrgetter('cancel'), iterablE))
def map(self, fn, *iterables):
futures = tuple(self.submit(fn, *args) for args in zip(*iterables))
def result_iterator():
future_iterator = iter(futures)
try:
for future in future_iterator:
yIEld future.result()
finally:
self.__cancel_futures(future_iterator)
return result_iterator()
def shutdown(self):
self.__cancel_futures(froZenset(self.__futures))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()
return false
_executor = Executor()
get_timeout = _executor.get_timeout
set_timeout = _executor.set_timeout
submit = _executor.submit
map_ = _executor.map
shutdown = _executor.shutdown
del _executor
过去,已经进行了许多尝试以在Python中添加超时功能,以便在指定的时间限制到期时,等待的代码可以继续运行。不幸的是,以前的配方要么允许正在运行的功能继续运行并消耗资源,要么使用特定于平台的线程终止方法终止该功能。该Wiki的目的是针对这个问题开发跨平台的答案,许多程序员必须针对各种编程项目解决该问题。
#! /usr/bin/env python
"""Provide way to add timeout specifications to arbitrary functions.
There are many ways to add a timeout to a function,but no solution
is both cross-platform and capable of terminaTing the procedure. This
module use the multiprocessing module to solve both of those problems."""
################################################################################
__author__ = 'Stephen "Zero" Chappell <Noctis.Skytower@gmail.com>'
__date__ = '11 February 2010'
__version__ = '$Revision: 3 $'
################################################################################
import inspect
import sys
import time
import multiprocessing
################################################################################
def add_timeout(function,limit=60):
"""Add a timeout parameter to a function and return it.
it is illegal to pass anything other than a function as the first
parameter. If the limit is not given,it gets a default value equal
to one minute. The function is wrapped and returned to the caller."""
assert inspect.isfunction(function)
if limit <= 0:
raise ValueError()
return _Timeout(function,limit)
class NotReadyError(Exception): pass
################################################################################
def _target(queue,function,*args,**kwargs):
"""Run a function with arguments and return output via a queue.
This is a Helper function for the Process created in _Timeout. It runs
the function with positional arguments and keyword arguments and then
returns the function's output by way of a queue. If an exception gets
raised,it is returned to _Timeout to be raised by the value property."""
try:
queue.put((True,function(*args,**kwargs)))
except:
queue.put((false,sys.exc_info()[1]))
class _Timeout:
"""Wrap a function and add a timeout (limit) attribute to it.
Instances of this class are automatically generated by the add_timeout
function defined above. Wrapping a function allows asynchronous calls
to be made and termination of execution after a timeout has passed."""
def __init__(self,limit):
"""Initialize instance in preparation for being called."""
self.__limit = limit
self.__function = function
self.__timeout = time.clock()
self.__process = multiprocessing.Process()
self.__queue = multiprocessing.Queue()
def __call__(self,**kwargs):
"""Execute the embedded function object asynchronously.
The function given to the constructor is transparently called and
requires that "ready" bE intermittently polled. If and when it is
True,the "value" property may then be checked for returned data."""
self.cancel()
self.__queue = multiprocessing.Queue(1)
args = (self.__queue,self.__function) + args
self.__process = multiprocessing.Process(target=_target,args=args,kwargs=kwargs)
self.__process.daemon = True
self.__process.start()
self.__timeout = self.__limit + time.clock()
def cancel(self):
"""Terminate any possible execution of the embedded function."""
if self.__process.is_alive():
self.__process.terminate()
@property
def ready(self):
"""Read-only property inDicaTing status of "value" property."""
if self.__queue.full():
return True
elif not self.__queue.empty():
return True
elif self.__timeout < time.clock():
self.cancel()
else:
return false
@property
def value(self):
"""Read-only property containing data returned from function."""
if self.ready is True:
flag,load = self.__queue.get()
if flag:
return load
raise load
raise NotReadyError()
def __get_limit(self):
return self.__limit
def __set_limit(self,value):
if value <= 0:
raise ValueError()
self.__limit = value
limit = property(__get_limit,__set_limit,doc="Property for controlling the value of the timeout.")
编辑: 这段代码是为Python 3.x编写的,并非为装饰类方法而设计。该@H_750_6@multiprocessing模块并非旨在跨流程边界修改类实例。
以上是大佬教程为你收集整理的如何在Python中的函数中添加超时全部内容,希望文章能够帮你解决如何在Python中的函数中添加超时所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。