GVKun编程网logo

在多个Python多处理队列上“选择”?(python %s 多个)

13

最近很多小伙伴都在问在多个Python多处理队列上“选择”?和python%s多个这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展ELI5Python模块“选择”和“选择器”、py

最近很多小伙伴都在问在多个Python多处理队列上“选择”?python %s 多个这两个问题,那么本篇文章就来给大家详细解答一下,同时本文还将给你拓展ELI5 Python 模块“选择”和“选择器”、python – 在主进程中异步等待多处理队列、python – 在多处理池中处理工作者死亡、python-3.x – 多处理队列子类问题等相关知识,下面开始了哦!

本文目录一览:

在多个Python多处理队列上“选择”?(python %s 多个)

在多个Python多处理队列上“选择”?(python %s 多个)

等待(不旋转)直到两个(多处理)队列中的任何一个都可用的最佳方法是什么(两者都驻留在同一系统上)?

ELI5 Python 模块“选择”和“选择器”

ELI5 Python 模块“选择”和“选择器”

如何解决ELI5 Python 模块“选择”和“选择器”?

我一直在尝试使用“socket”模块和编写小型服务器。现在我 偶然发现了“选择”和“选择器”库。我一直试图围绕他们所做的事情进行思考,但还没有找到任何对我来说足够愚蠢的解释。 有人可以把它给我吗?

非常感谢。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

python – 在主进程中异步等待多处理队列

python – 在主进程中异步等待多处理队列

我有以下场景:多个工作进程将有关其当前状态的事件发送到事件调度程序.如果我们在主进程中,则此事件调度程序需要处理所有事件,或者如果我们处于工作进程中,则表示主进程的事件调度程序处理这些事件.

这里的主要关键是事件处理也必须在主进程的主线程中,所以我不能在线程内运行一段时间的True循环并等待来自那里的工作进程的消息.

所以我拥有的是:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import current_process,Process,Queue
from threading import current_thread
from time import sleep

def get_q(q):
    print("Waiting for the queue ({} / {})\n".format(current_thread().name,current_process().name))
    return q.get()

async def message_q(q):
    while True:
        f = loop.run_in_executor(None,get_q,q)

        await f

        if f.result() is None:
            print("Done")
            return;

        print("Got the result ({} / {})".format(current_thread().name,current_process().name))
        print("Result is: {}\n".format(f.result()))

async def something_else():
    while True:
        print("Something else\n")
        await asyncio.sleep(2)

def other_process(q):
    for i in range(5):
        print("Putting something in the queue ({})".format(current_process().name))
        q.put(i)
        sleep(1)

    q.put(None)

q = Queue()

Process(target=other_process,args=(q,),daemon=True).start()

loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor(max_workers=1))
asyncio.ensure_future(message_q(q))
asyncio.ensure_future(something_else())
loop.run_until_complete(asyncio.sleep(6))

other_process()是一个示例性的工作进程,它使用队列来发信号通知主进程,该进程运行事件循环来处理东西并等待队列上的任何数据.在实际情况中,此过程将向事件调度程序发出信号,该事件调度程序随后将处理队列消息传递,将消息传递给主进程事件调度程序,但在此我稍微简化了一下.

但是,我对此并不十分满意.一次又一次地将get_q()提交给ThreadPoolExecutor会产生更多的开销,并且不像一个长时间运行的线程那样干净.此外,只要队列中没有其他数据,就会等待f没有最佳和阻塞,这会阻止事件循环退出.我的解决方法是在worker完成后发送None,如果队列中没有None,则退出message_q().

有没有更好的方法来实现这个?性能非常关键,我想将Queue对象保持在事件调度程序的本地,而不是将其传递给管理工作进程的代码(或者需要调用某种finalize()方法).

最佳答案
我现在将其实现为异步上下文管理器.上下文管理器调用

asyncio.ensure_future(message_q())

在其__aenter __()方法中,在其__aexit __()方法中将None添加到队列中以关闭message_q()中的无限循环.

然后,可以在process-spawning代码部分的async with语句中使用上下文管理器,从而无需手动调用shutdown方法.但是,在确保message_q()协程允许上下文管理器初始化队列侦听器之后,建议在__aenter __()方法中调用await asyncio.sleep(0).否则,不会立即调用message_q().这本身不是问题(因为队列无论如何都被填充),但是它会延迟事件转发,直到代码中出现下一个等待.

应该使用ProcesspoolExecutor和loop.run_in_executor()生成进程,因此等待进程不会阻止事件循环.

您可能还希望使用JoinableQueue来确保在退出上下文管理器之前处理了所有事件,而不是使用Queue.

python – 在多处理池中处理工作者死亡

python – 在多处理池中处理工作者死亡

我有一个简单的服务器:

from multiprocessing import Pool,TimeoutError
import time
import os


if __name__ == '__main__':
    # start worker processes
    pool = Pool(processes=1)

    while True:
        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid,())  # runs in *only* one process
        try:
            print(res.get(timeout=1))             # prints the PID of that process
        except TimeoutError:
            print('worker timed out')

        time.sleep(5)

    pool.close()
    print("Now the pool is closed and no longer available")
    pool.join()
    print("Done")

如果我运行这个,我会得到类似的东西:

47292
47292

然后我在服务器运行时杀了47292.启动了新的工作进程,但服务器的输出是:

47292
47292
worker timed out
worker timed out
worker timed out

池仍在尝试向旧工作进程发送请求.

我已经完成了在服务器和工作程序中捕获信号的一些工作,我可以获得稍微好一点的行为,但服务器似乎仍在等待关闭死亡的孩子(即.pool.join()永远不会结束) .

处理工人死亡的正确方法是什么?

如果没有工人死亡,那么从服务器进程中正常关闭工作人员似乎才有效.

(在Python 3.4.4上,但如果有帮助的话,很高兴升级.)

更新:
有趣的是,如果使用processes = 2创建池并且您杀死一个工作进程,等待几秒钟并终止另一个进程,则不会发生此工作者超时问题.但是,如果您快速连续杀死两个工作进程,则“工作超时”问题会再次出现.

也许相关的是,当问题发生时,终止服务器进程将使工作进程保持运行.

最佳答案
此行为来自multiprocessing.Pool的设计.当你杀死一个工人时,你可能会杀死一个持有call_queue.rlock的工人.当这个进程在持有锁时被终止时,其他进程将不再能够读入call_queue,因为它无法再与其工作者通信而破坏了Pool.
所以实际上没有办法杀死一个工人,并确保你的游泳池之后仍然没问题,因为你可能会陷入僵局.

multiprocessing.Pool不处理工人死亡.您可以尝试使用concurrent.futures.ProcesspoolExecutor(使用稍微不同的API)来处理默认情况下进程的失败.当进程在ProcesspoolExecutor中死亡时,整个执行程序将关闭并返回brokenProcesspool错误.

请注意,此实现中还有其他死锁,应在loky中修复.(免责声明:我是此库的维护者).此外,loky允许您使用ReusablePoolExecutor和方法_resize调整现有执行程序的大小.如果您有兴趣,请告诉我,从这个软件包开始,我可以为您提供一些帮助. (我意识到我们仍然需要对文档进行一些工作…… 0_0)

python-3.x – 多处理队列子类问题

python-3.x – 多处理队列子类问题

我想将multiprocessing.Queue子类化,以实现抓取队列块的进程.唯一的问题是,我得到一个奇怪的TypeError?
#!/usr/bin/env python

#whaaaaa!?

from multiprocessing import Queue

class BufferQueue(Queue):
    '''A thread/process safe queue for append/popleft operations with the import
    buffer.'''

    def __init__(self,**kwargs):
        super(BufferQueue,self).__init__(**kwargs)

    def consume(self,lim):
        '''Consume up to but no more than lim elements and return them in a new
        list,cleaning up the buffer.

        @params
        lim -- the maximum (limit) to consume from the list.  If less items
        exist in the list then that's fine too.
        '''
        lim = len(queue) if len(queue) < lim else lim
        return [self.popleft() for i in range(lim)]

测试这个(我将其拆分,以便我不会拉其他任何东西)

| => ./tests/wtf_queue.py 
Traceback (most recent call last):
  File "./tests/wtf_queue.py",line 10,in <module>
    class BufferQueue(Queue):
TypeError: method expected 2 arguments,got 3

编辑/更新:

解决方法

multiprocessing.Queue是一个创建队列的方法,因此您应该将其用作函数my_queue = Queue().
>>> from multiprocessing import Queue
>>> type(Queue)
<class 'method'>

正如您所看到的,不是“类型”,您将使用它来进行子类化.

如果要实现自己的队列,可以查看queue.Queue

编辑:

如果要从多处理子类化队列,请改用multiprocessing.queues.Queue,这是multiprocessing.Queue()返回的对象的类型.

今天关于在多个Python多处理队列上“选择”?python %s 多个的讲解已经结束,谢谢您的阅读,如果想了解更多关于ELI5 Python 模块“选择”和“选择器”、python – 在主进程中异步等待多处理队列、python – 在多处理池中处理工作者死亡、python-3.x – 多处理队列子类问题的相关知识,请在本站搜索。

本文标签: