本文将为您提供关于如何在Python中的函数中添加超时的详细介绍,我们还将为您解释python添加函数的相关知识,同时,我们还将为您提供关于python中的函数(陆续添加)、从Python中的函数中访
本文将为您提供关于如何在Python中的函数中添加超时的详细介绍,我们还将为您解释python 添加函数的相关知识,同时,我们还将为您提供关于python中的函数(陆续添加)、从Python中的函数中访问* args、从python中的另一个函数中查找任何没有出现的函数、在python 3中的函数中创建动态命名的变量/在python 3中了解exec / eval / locals的实用信息。
本文目录一览:- 如何在Python中的函数中添加超时(python 添加函数)
- python中的函数(陆续添加)
- 从Python中的函数中访问* args
- 从python中的另一个函数中查找任何没有出现的函数
- 在python 3中的函数中创建动态命名的变量/在python 3中了解exec / eval / locals
如何在Python中的函数中添加超时(python 添加函数)
过去,已经进行了许多尝试以在Python中添加超时功能,以便在指定的时间限制到期时,等待的代码可以继续运行。不幸的是,以前的配方要么允许正在运行的功能继续运行并消耗资源,要么使用特定于平台的线程终止方法终止该功能。该Wiki的目的是针对这个问题开发跨平台的答案,许多程序员必须针对各种编程项目解决该问题。
#! /usr/bin/env python"""Provide way to add timeout specifications to arbitrary functions.There are many ways to add a timeout to a function, but no solutionis both cross-platform and capable of terminating the procedure. Thismodule use the multiprocessing module to solve both of those problems."""################################################################################__author__ = ''Stephen "Zero" Chappell <Noctis.Skytower@gmail.com>''__date__ = ''11 February 2010''__version__ = ''$Revision: 3 $''################################################################################import inspectimport sysimport timeimport multiprocessing################################################################################def add_timeout(function, limit=60): """Add a timeout parameter to a function and return it. It is illegal to pass anything other than a function as the first parameter. If the limit is not given, it gets a default value equal to one minute. The function is wrapped and returned to the caller.""" assert inspect.isfunction(function) if limit <= 0: raise ValueError() return _Timeout(function, limit)class NotReadyError(Exception): pass################################################################################def _target(queue, function, *args, **kwargs): """Run a function with arguments and return output via a queue. This is a helper function for the Process created in _Timeout. It runs the function with positional arguments and keyword arguments and then returns the function''s output by way of a queue. If an exception gets raised, it is returned to _Timeout to be raised by the value property.""" try: queue.put((True, function(*args, **kwargs))) except: queue.put((False, sys.exc_info()[1]))class _Timeout: """Wrap a function and add a timeout (limit) attribute to it. Instances of this class are automatically generated by the add_timeout function defined above. Wrapping a function allows asynchronous calls to be made and termination of execution after a timeout has passed.""" def __init__(self, function, limit): """Initialize instance in preparation for being called.""" self.__limit = limit self.__function = function self.__timeout = time.clock() self.__process = multiprocessing.Process() self.__queue = multiprocessing.Queue() def __call__(self, *args, **kwargs): """Execute the embedded function object asynchronously. The function given to the constructor is transparently called and requires that "ready" be intermittently polled. If and when it is True, the "value" property may then be checked for returned data.""" self.cancel() self.__queue = multiprocessing.Queue(1) args = (self.__queue, self.__function) + args self.__process = multiprocessing.Process(target=_target, args=args, kwargs=kwargs) self.__process.daemon = True self.__process.start() self.__timeout = self.__limit + time.clock() def cancel(self): """Terminate any possible execution of the embedded function.""" if self.__process.is_alive(): self.__process.terminate() @property def ready(self): """Read-only property indicating status of "value" property.""" if self.__queue.full(): return True elif not self.__queue.empty(): return True elif self.__timeout < time.clock(): self.cancel() else: return False @property def value(self): """Read-only property containing data returned from function.""" if self.ready is True: flag, load = self.__queue.get() if flag: return load raise load raise NotReadyError() def __get_limit(self): return self.__limit def __set_limit(self, value): if value <= 0: raise ValueError() self.__limit = value limit = property(__get_limit, __set_limit, doc="Property for controlling the value of the timeout.")
编辑: 这段代码是为Python 3.x编写的,并非为装饰类方法而设计。该multiprocessing
模块并非旨在跨流程边界修改类实例。
答案1
小编典典这个问题是在9年前提出的,从那以后,Python和我的经验清单都发生了很大变化。在查看了标准库中的其他API并希望部分复制其中一个API之后,编写了follow模块以达到与问题中发布的API相似的目的。
异步
#! /usr/bin/env python3import _threadimport abc as _abcimport collections as _collectionsimport enum as _enumimport math as _mathimport multiprocessing as _multiprocessingimport operator as _operatorimport queue as _queueimport signal as _signalimport sys as _sysimport time as _time__all__ = ( ''Executor'', ''get_timeout'', ''set_timeout'', ''submit'', ''map_'', ''shutdown'')class _Base(metaclass=_abc.ABCMeta): __slots__ = ( ''__timeout'', ) @_abc.abstractmethod def __init__(self, timeout): self.timeout = _math.inf if timeout is None else timeout def get_timeout(self): return self.__timeout def set_timeout(self, value): if not isinstance(value, (float, int)): raise TypeError(''value must be of type float or int'') if value <= 0: raise ValueError(''value must be greater than zero'') self.__timeout = value timeout = property(get_timeout, set_timeout)def _run_and_catch(fn, args, kwargs): # noinspection PyPep8,PyBroadException try: return False, fn(*args, **kwargs) except: return True, _sys.exc_info()[1]def _run(fn, args, kwargs, queue): queue.put_nowait(_run_and_catch(fn, args, kwargs))class _State(_enum.IntEnum): PENDING = _enum.auto() RUNNING = _enum.auto() CANCELLED = _enum.auto() FINISHED = _enum.auto() ERROR = _enum.auto()def _run_and_catch_loop(iterable, *args, **kwargs): exception = None for fn in iterable: error, value = _run_and_catch(fn, args, kwargs) if error: exception = value if exception: raise exceptionclass _Future(_Base): __slots__ = ( ''__queue'', ''__process'', ''__start_time'', ''__callbacks'', ''__result'', ''__mutex'' ) def __init__(self, timeout, fn, args, kwargs): super().__init__(timeout) self.__queue = _multiprocessing.Queue(1) self.__process = _multiprocessing.Process( target=_run, args=(fn, args, kwargs, self.__queue), daemon=True ) self.__start_time = _math.inf self.__callbacks = _collections.deque() self.__result = True, TimeoutError() self.__mutex = _thread.allocate_lock() @property def __state(self): pid, exitcode = self.__process.pid, self.__process.exitcode return (_State.PENDING if pid is None else _State.RUNNING if exitcode is None else _State.CANCELLED if exitcode == -_signal.SIGTERM else _State.FINISHED if exitcode == 0 else _State.ERROR) def __repr__(self): root = f''{type(self).__name__} at {id(self)} state={self.__state.name}'' if self.__state < _State.CANCELLED: return f''<{root}>'' error, value = self.__result suffix = f''{"raised" if error else "returned"} {type(value).__name__}'' return f''<{root} {suffix}>'' def __consume_callbacks(self): while self.__callbacks: yield self.__callbacks.popleft() def __invoke_callbacks(self): self.__process.join() _run_and_catch_loop(self.__consume_callbacks(), self) def cancel(self): self.__process.terminate() self.__invoke_callbacks() def __auto_cancel(self): elapsed_time = _time.perf_counter() - self.__start_time if elapsed_time > self.timeout: self.cancel() return elapsed_time def cancelled(self): self.__auto_cancel() return self.__state is _State.CANCELLED def running(self): self.__auto_cancel() return self.__state is _State.RUNNING def done(self): self.__auto_cancel() return self.__state > _State.RUNNING def __handle_result(self, error, value): self.__result = error, value self.__invoke_callbacks() def __ensure_termination(self): with self.__mutex: elapsed_time = self.__auto_cancel() if not self.__queue.empty(): self.__handle_result(*self.__queue.get_nowait()) elif self.__state < _State.CANCELLED: remaining_time = self.timeout - elapsed_time if remaining_time == _math.inf: remaining_time = None try: result = self.__queue.get(True, remaining_time) except _queue.Empty: self.cancel() else: self.__handle_result(*result) def result(self): self.__ensure_termination() error, value = self.__result if error: raise value return value def exception(self): self.__ensure_termination() error, value = self.__result if error: return value def add_done_callback(self, fn): if self.done(): fn(self) else: self.__callbacks.append(fn) def _set_running_or_notify_cancel(self): if self.__state is _State.PENDING: self.__process.start() self.__start_time = _time.perf_counter() else: self.cancel()class Executor(_Base): __slots__ = ( ''__futures'', ) def __init__(self, timeout=None): super().__init__(timeout) self.__futures = set() def submit(self, fn, *args, **kwargs): future = _Future(self.timeout, fn, args, kwargs) self.__futures.add(future) future.add_done_callback(self.__futures.remove) # noinspection PyProtectedMember future._set_running_or_notify_cancel() return future @staticmethod def __cancel_futures(iterable): _run_and_catch_loop(map(_operator.attrgetter(''cancel''), iterable)) def map(self, fn, *iterables): futures = tuple(self.submit(fn, *args) for args in zip(*iterables)) def result_iterator(): future_iterator = iter(futures) try: for future in future_iterator: yield future.result() finally: self.__cancel_futures(future_iterator) return result_iterator() def shutdown(self): self.__cancel_futures(frozenset(self.__futures)) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown() return False_executor = Executor()get_timeout = _executor.get_timeoutset_timeout = _executor.set_timeoutsubmit = _executor.submitmap_ = _executor.mapshutdown = _executor.shutdowndel _executor
python中的函数(陆续添加)
1.函数:len()
作用:返回字符串、列表、字典、元组等长度
语法:len(str)
参数:
str:要计算的字符串、列表、字典、元组等
返回值:字符串、列表、字典、元组等元素的长度
实例
1、计算字符串的长度:
>>> s = "hello word" >>> len(s) 10
2、计算列表的元素个数:
>>> str= [‘h‘,‘e‘,‘l‘,‘o‘] >>> len(str) 5
3、计算字典的总长度(即键值对总数):
>>> dict = {‘num‘:777,‘name‘:"anne"} >>> len(dict) 2
4、计算元组元素个数:
>>> t = (‘G‘,‘o‘,‘d‘) >>> len(t) 4
2.range()函数
range()是python内置函数它能返回一系列连续增加的整数,它的工作方式类似于分片,可以生成一个列表对象。
range函数大多数时常出现在for循环中,在for循环中可做为索引使用。其实它也可以出现在任何需要整数列表的环境中,
在python 3.0中range函数是一个迭代器。range()函数内只有一个参数,则表示会产生从0开始计数的整数列表:
实例:
>>> range(5) [0,1,2,3,4] #python 返回值
python range()中,当传入两个参数时,则将第一个参数做为起始位,第二个参数为结束位:
>>> range(0,6) [0,4,5]
range()函数内可以填入三个参数,第三个参数是步进值(步进值默认为1):
>>> range(0,10,2) [0,6,8]
range函数的参数和结果也并非一定要是正数或是递增的,好比下面两个例子:
>>> range(-4,4) [-4,-3,-2,-1,3] >>> >>> range(4,-4,-1) [4,-3]
range()在for循环中的作用及技巧
range可以根据给定的次数,重复动作,来看一个range与for循环最简单的例子:
>>> x = ‘playpython‘ >>> for i in x: ... print i,... p l a y p y t h o n >>> range(len(x)) [0,5,7,8,9] >>> len(x) 10 >>> for i in range(len(x)): ... print x[i],... p l a y p y t h o n >>> (来源于知乎)
从Python中的函数中访问* args
def nodefunction(self,*args): return self[1] + self[2]
基本上我要做的是抓住通过参数传入的数据.我只是坚持使用* args时引用函数内部参数的语法.
解决方法
def nodeMethod(self,*args): return args[0],args[1]
你是这个意思吗?
请注意,“args”并没有什么特别之处.您可以使用任何变量名称.重要的是*运算符.
>>> class Node(object): ... def nodeMethod(self,*cornucopia): ... return cornucopia[0],cornucopia[1] ... >>> n = Node() >>> n.nodeMethod(1,2,3) (1,2)
仍然,“args”是最惯用的变量名称;如果没有其他明显的理由,我就不会使用其他任何东西.
从python中的另一个函数中查找任何没有出现的函数
通常,这是不可能的– MPersistence
仅在之前或之后 MDR
运行。至少需要修改MDR
来跟踪迭代本身,这引发了如何在两个函数之间安全地交换结果的问题。理想情况下,MPersistence
应该通过独立计算数字乘法链来工作。
为避免代码重复,请使用一个函数来计算数字根和持久性。根据需要提供便利功能,以直接返回每个组件。
def mdrp(n: int):
"""
Compute the multiplicative persistence and digital root of ``n``
"""
if n < 10:
return 0,n
else:
persistence,digital_root = mdrp(prodDigits(n))
return persistence + 1,digital_root
def MDR(n):
return mdrp(n)[1]
def MPersistence(n):
return mdrp(n)[0]
,
您不需要其他函数来计算函数调用。 MisterMiyagi的想法是在函数本身内部有一个计数器,并在每次函数调用时都返回持久性和数字根,最后将结果拆包。
我认为更简单的方法是将计数器作为全局变量,并在每次调用时在函数内增加其值:
def mdr(n):
if n<10:
return n
else:
return mdr(prodDigits(n))
globals()['persistence'] += 1
persistence = 0
print(mdr(86),persistence) # 6 3
这种方法看起来更简单,但要记住一件事:每次调用该函数之前都必须初始化计数器:
print(msr(341),persistence) # 2 5 Wrong result
persistence = 0
print(msr(341),persistence) # 2 2 Right result
这不是纯函数,因为它依赖并更改非局部变量。
在python 3中的函数中创建动态命名的变量/在python 3中了解exec / eval / locals
首先,我要说的是,我阅读了许多与创建动态命名变量类似的主题,但是它们大多与Python 2有关,或者假定您正在使用类。是的,我阅读了Python
2和Python 3中exec函数的行为。
我也知道在99%的时间里创建动态命名的变量是一个坏主意,而字典是获得的方法,但是我只想知道是否仍然可行,以及exec和locals在python
3中的工作原理。
我想显示一些示例代码来说明我的问题(斐波纳契计算斐波纳契数,ListOfLetters提供[“ A”,“ B”,…]):
def functionname(): for index, buchstabe in enumerate(ListOfLetters.create_list("A", "K"), 1): exec("{} = {}".format(buchstabe, fibonacci(index)) ) #A = 1, B = 1, C = 2, D = 3, E = 5,... print(index, buchstabe, eval(buchstabe)) #works nicely, e.g. prints "4 D 3" print(locals()) #pritns all locals: {''B'': 1, ''A'': 1, ''index'': 11, ''C'': 2, ''H'': 21, ''K'': 89, ... print(locals()[''K'']) #prints 89 as it should print(eval("K")) #prints 89 as it should print(K) #NameError: name ''K'' is not defined
因此,至少就我目前的理解而言,的行为存在一些不一致之处locals()
,因为它包含所添加的变量名,exec()
但该变量在函数中不可用。
如果有人可以解释一下并告诉您这是设计使然还是语言中确实存在不一致之处,我将不胜感激。是的,我知道locals
不应该修改它,但是我没有修改它,我在打电话给exec()
…
答案1
小编典典当您不确定某个东西为什么能在Python中正常工作时,它通常可以帮助将您困惑的行为放入函数中,然后使用该dis
模块将其从Python字节码中分解出来。
让我们从一个简单的代码版本开始:
def foo(): exec("K = 89") print(K)
如果运行foo()
,您将得到与更复杂的函数相同的异常:
>>> foo()Traceback (most recent call last): File "<pyshell#167>", line 1, in <module> foo() File "<pyshell#166>", line 3, in foo print(K)NameError: name ''K'' is not defined
让我们分解一下,看看为什么:
>>> import dis>>> dis.dis(foo) 2 0 LOAD_GLOBAL 0 (exec) 3 LOAD_CONST 1 (''K = 89'') 6 CALL_FUNCTION 1 (1 positional, 0 keyword pair) 9 POP_TOP 3 10 LOAD_GLOBAL 1 (print) 13 LOAD_GLOBAL 2 (K) 16 CALL_FUNCTION 1 (1 positional, 0 keyword pair) 19 POP_TOP 20 LOAD_CONST 0 (None) 23 RETURN_VALUE
您需要注意的操作是标为“
13”的操作。这是编译器K
在函数的最后一行(print(K)
)中查找的地方。它使用的LOAD_GLOBAL
操作码失败,因为“
K”不是全局变量名称,而是它在我们的locals()
字典中的值(由exec
调用添加)。
如果我们说服编译器将其K
视为局部变量(在运行之前为其提供值exec
),那么它将知道不寻找不存在的全局变量怎么办?
def bar(): K = None exec("K = 89") print(K)
如果运行该函数,将不会出现错误,但不会输出期望值:
>>> bar()None
让我们分解一下原因:
>>> dis.dis(bar) 2 0 LOAD_CONST 0 (None) 3 STORE_FAST 0 (K) 3 6 LOAD_GLOBAL 0 (exec) 9 LOAD_CONST 1 (''K = 89'') 12 CALL_FUNCTION 1 (1 positional, 0 keyword pair) 15 POP_TOP 4 16 LOAD_GLOBAL 1 (print) 19 LOAD_FAST 0 (K) 22 CALL_FUNCTION 1 (1 positional, 0 keyword pair) 25 POP_TOP 26 LOAD_CONST 0 (None) 29 RETURN_VALUE
注意在“ 3”和“
19”处使用的操作码。Python编译器使用STORE_FAST
和LOAD_FAST
将局部变量的值K
放入插槽0,然后将其取回。使用编号的插槽比从像这样的字典中插入和获取值的速度要快得多locals()
,这就是Python编译器为函数中的所有局部变量访问而这样做的原因。您不能通过修改返回的字典来覆盖插槽中的局部变量locals()
(exec
如果您不将其传递给它的命名空间的字典,也一样)。
确实,让我们尝试函数的第三个版本,locals
当我们将K
其定义为常规局部变量时,可以再次浏览:
def baz(): K = None exec("K = 89") print(locals())
89
这次您也不会在输出中看到!
>>> baz(){"K": None}
函数文档中解释了您看到旧K
值的原因:locals()
更新并返回代表当前本地符号表的字典。
该语句K
未更改存储本地变量值的插槽,该exec
语句仅修改locals()
字典。locals()
再次调用时,Python使用插槽中的值“更新”字典,用替换存储在其中的值exec
。
这就是为什么文档继续说:
注意: 此字典的内容不得修改;更改可能不会影响解释器使用的局部变量和自由变量的值。
您的exec
呼叫正在修改locals()
字典,并且您发现以后的代码并不总是看到其更改。
今天关于如何在Python中的函数中添加超时和python 添加函数的介绍到此结束,谢谢您的阅读,有关python中的函数(陆续添加)、从Python中的函数中访问* args、从python中的另一个函数中查找任何没有出现的函数、在python 3中的函数中创建动态命名的变量/在python 3中了解exec / eval / locals等更多相关知识的信息可以在本站进行查询。
本文标签: