最近很多小伙伴都在问在多个Python多处理队列上“选择”?和python%s多个这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展ELI5Python模块“选择”和“选择器”、py
最近很多小伙伴都在问在多个Python多处理队列上“选择”?和python %s 多个这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展ELI5 Python 模块“选择”和“选择器”、python – 在主进程中异步等待多处理队列、python – 在多处理池中处理工作者死亡、python-3.x – 多处理队列子类问题等相关知识,下面开始了哦!
本文目录一览:- 在多个Python多处理队列上“选择”?(python %s 多个)
- ELI5 Python 模块“选择”和“选择器”
- python – 在主进程中异步等待多处理队列
- python – 在多处理池中处理工作者死亡
- python-3.x – 多处理队列子类问题
在多个Python多处理队列上“选择”?(python %s 多个)
等待(不旋转)直到两个(多处理)队列中的任何一个都可用的最佳方法是什么(两者都驻留在同一系统上)?
ELI5 Python 模块“选择”和“选择器”
如何解决ELI5 Python 模块“选择”和“选择器”?
我一直在尝试使用“socket”模块和编写小型服务器。现在我 偶然发现了“选择”和“选择器”库。我一直试图围绕他们所做的事情进行思考,但还没有找到任何对我来说足够愚蠢的解释。 有人可以把它给我吗?
非常感谢。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)
python – 在主进程中异步等待多处理队列
我有以下场景:多个工作进程将有关其当前状态的事件发送到事件调度程序.如果我们在主进程中,则此事件调度程序需要处理所有事件,或者如果我们处于工作进程中,则表示主进程的事件调度程序处理这些事件.
这里的主要关键是事件处理也必须在主进程的主线程中,所以我不能在线程内运行一段时间的True循环并等待来自那里的工作进程的消息.
所以我拥有的是:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import current_process,Process,Queue
from threading import current_thread
from time import sleep
def get_q(q):
print("Waiting for the queue ({} / {})\n".format(current_thread().name,current_process().name))
return q.get()
async def message_q(q):
while True:
f = loop.run_in_executor(None,get_q,q)
await f
if f.result() is None:
print("Done")
return;
print("Got the result ({} / {})".format(current_thread().name,current_process().name))
print("Result is: {}\n".format(f.result()))
async def something_else():
while True:
print("Something else\n")
await asyncio.sleep(2)
def other_process(q):
for i in range(5):
print("Putting something in the queue ({})".format(current_process().name))
q.put(i)
sleep(1)
q.put(None)
q = Queue()
Process(target=other_process,args=(q,),daemon=True).start()
loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor(max_workers=1))
asyncio.ensure_future(message_q(q))
asyncio.ensure_future(something_else())
loop.run_until_complete(asyncio.sleep(6))
other_process()是一个示例性的工作进程,它使用队列来发信号通知主进程,该进程运行事件循环来处理东西并等待队列上的任何数据.在实际情况中,此过程将向事件调度程序发出信号,该事件调度程序随后将处理队列消息传递,将消息传递给主进程事件调度程序,但在此我稍微简化了一下.
但是,我对此并不十分满意.一次又一次地将get_q()提交给ThreadPoolExecutor会产生更多的开销,并且不像一个长时间运行的线程那样干净.此外,只要队列中没有其他数据,就会等待f没有最佳和阻塞,这会阻止事件循环退出.我的解决方法是在worker完成后发送None,如果队列中没有None,则退出message_q().
有没有更好的方法来实现这个?性能非常关键,我想将Queue对象保持在事件调度程序的本地,而不是将其传递给管理工作进程的代码(或者需要调用某种finalize()方法).
asyncio.ensure_future(message_q())
在其__aenter __()方法中,在其__aexit __()方法中将None添加到队列中以关闭message_q()中的无限循环.
然后,可以在process-spawning代码部分的async with语句中使用上下文管理器,从而无需手动调用shutdown方法.但是,在确保message_q()协程允许上下文管理器初始化队列侦听器之后,建议在__aenter __()方法中调用await asyncio.sleep(0).否则,不会立即调用message_q().这本身不是问题(因为队列无论如何都被填充),但是它会延迟事件转发,直到代码中出现下一个等待.
应该使用ProcesspoolExecutor和loop.run_in_executor()生成进程,因此等待进程不会阻止事件循环.
您可能还希望使用JoinableQueue来确保在退出上下文管理器之前处理了所有事件,而不是使用Queue.
python – 在多处理池中处理工作者死亡
我有一个简单的服务器:
from multiprocessing import Pool,TimeoutError
import time
import os
if __name__ == '__main__':
# start worker processes
pool = Pool(processes=1)
while True:
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid,()) # runs in *only* one process
try:
print(res.get(timeout=1)) # prints the PID of that process
except TimeoutError:
print('worker timed out')
time.sleep(5)
pool.close()
print("Now the pool is closed and no longer available")
pool.join()
print("Done")
如果我运行这个,我会得到类似的东西:
47292
47292
然后我在服务器运行时杀了47292.启动了新的工作进程,但服务器的输出是:
47292
47292
worker timed out
worker timed out
worker timed out
池仍在尝试向旧工作进程发送请求.
我已经完成了在服务器和工作程序中捕获信号的一些工作,我可以获得稍微好一点的行为,但服务器似乎仍在等待关闭死亡的孩子(即.pool.join()永远不会结束) .
处理工人死亡的正确方法是什么?
如果没有工人死亡,那么从服务器进程中正常关闭工作人员似乎才有效.
(在Python 3.4.4上,但如果有帮助的话,很高兴升级.)
更新:
有趣的是,如果使用processes = 2创建池并且您杀死一个工作进程,等待几秒钟并终止另一个进程,则不会发生此工作者超时问题.但是,如果您快速连续杀死两个工作进程,则“工作超时”问题会再次出现.
也许相关的是,当问题发生时,终止服务器进程将使工作进程保持运行.
所以实际上没有办法杀死一个工人,并确保你的游泳池之后仍然没问题,因为你可能会陷入僵局.
multiprocessing.Pool不处理工人死亡.您可以尝试使用concurrent.futures.ProcesspoolExecutor(使用稍微不同的API)来处理默认情况下进程的失败.当进程在ProcesspoolExecutor中死亡时,整个执行程序将关闭并返回brokenProcesspool错误.
请注意,此实现中还有其他死锁,应在loky
中修复.(免责声明:我是此库的维护者).此外,loky允许您使用ReusablePoolExecutor和方法_resize调整现有执行程序的大小.如果您有兴趣,请告诉我,从这个软件包开始,我可以为您提供一些帮助. (我意识到我们仍然需要对文档进行一些工作…… 0_0)
python-3.x – 多处理队列子类问题
#!/usr/bin/env python #whaaaaa!? from multiprocessing import Queue class BufferQueue(Queue): '''A thread/process safe queue for append/popleft operations with the import buffer.''' def __init__(self,**kwargs): super(BufferQueue,self).__init__(**kwargs) def consume(self,lim): '''Consume up to but no more than lim elements and return them in a new list,cleaning up the buffer. @params lim -- the maximum (limit) to consume from the list. If less items exist in the list then that's fine too. ''' lim = len(queue) if len(queue) < lim else lim return [self.popleft() for i in range(lim)]
测试这个(我将其拆分,以便我不会拉其他任何东西)
| => ./tests/wtf_queue.py Traceback (most recent call last): File "./tests/wtf_queue.py",line 10,in <module> class BufferQueue(Queue): TypeError: method expected 2 arguments,got 3
编辑/更新:
解决方法
>>> from multiprocessing import Queue >>> type(Queue) <class 'method'>
正如您所看到的,不是“类型”,您将使用它来进行子类化.
如果要实现自己的队列,可以查看queue.Queue
编辑:
如果要从多处理子类化队列,请改用multiprocessing.queues.Queue,这是multiprocessing.Queue()返回的对象的类型.
今天关于在多个Python多处理队列上“选择”?和python %s 多个的讲解已经结束,谢谢您的阅读,如果想了解更多关于ELI5 Python 模块“选择”和“选择器”、python – 在主进程中异步等待多处理队列、python – 在多处理池中处理工作者死亡、python-3.x – 多处理队列子类问题的相关知识,请在本站搜索。
本文标签: