GVKun编程网logo

在Python中的进程之间共享许多队列(在python中的进程之间共享许多队列)

20

本文将介绍在Python中的进程之间共享许多队列的详细情况,特别是关于在python中的进程之间共享许多队列的相关信息。我们将通过案例分析、数据研究等多种方式,帮助您更全面地了解这个主题,同时也将涉及

本文将介绍在Python中的进程之间共享许多队列的详细情况,特别是关于在python中的进程之间共享许多队列的相关信息。我们将通过案例分析、数据研究等多种方式,帮助您更全面地了解这个主题,同时也将涉及一些关于linux – 在不相关的进程之间共享futex、python 30 进程之间的相互独立、进程之间的时间差、python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存、python – 在进程对象之间共享SciPy稀疏数组的知识。

本文目录一览:

在Python中的进程之间共享许多队列(在python中的进程之间共享许多队列)

在Python中的进程之间共享许多队列(在python中的进程之间共享许多队列)

我知道multiprocessing.Manager()如何将其用于创建共享对象,尤其是可以在工作人员之间共享的队列。有这个问题,这个问题,[这个问题](http://codingdict.com/questions/1299甚至是我自己的一个问题。

但是,我需要定义很多队列,每个队列都链接一对特定的进程。假设每对进程及其链接队列均由变量标识key

当我需要放置和获取数据时,我想使用字典来访问我的队列。我无法完成这项工作。我已经尝试了很多东西。随着multiprocessing进口为mp

for key in all_keys: DICT[key] =mp.Queue在多处理模块(称为multi.py)导入的配置文件中那样定义dict不会返回错误,但是DICT[key]在进程之间不共享队列,每个进程似乎都有自己的队列副本,因此不会发生通信。

如果我尝试在定义DICT进程并启动它们的主要多处理函数的开始处定义,例如

DICT = mp.Manager().dict()    for key in all_keys:    DICT[key] = mp.Queue()

我得到错误

RuntimeError: Queue objects should only be shared between processes through inheritance

更改为

DICT = mp.Manager().dict()    for key in all_keys:    DICT[key] = mp.Manager().Queue()

只会使一切变得更糟。multi.py在main函数的顶部而不是内部尝试类似的定义会返回类似的错误。

必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列。有任何想法吗?

编辑

这是程序的基本架构:

1-加载第一个模块,该模块定义一些变量,import
multi,launchsmulti.main()和加载另一个模块,该模块开始一系列模块加载和代码执行。与此同时…

2-multi.main看起来像这样:

def main():    manager = mp.Manager()    pool = mp.Pool()    DICT2 = manager.dict()    for key in all_keys:        DICT2[key] = manager.Queue()        proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file        proc_2 =  pool.apply_async(targ2,(DICT2[key], otherargs,)

除了使用pooland之外manager,我还启动了以下程序:

mp.Process(target=targ1, args=(DICT[key],))

3-该函数targ1接收key来自主进程的输入数据(按排序)。它旨在将结果传递给它,DICT[key]以便targ2可以执行其工作。这是不起作用的部分。有任意数量的targ1s,targ2s等,因此有任意数量的队列。

4-这些过程中的某些过程的结果将被发送到一堆不同的数组/熊猫数据帧,这些数据帧也由索引key,我希望可以从任意过程中访问它们,甚至是在不同模块中启动的过程。我还没有写这部分,这可能是一个不同的问题。(我在这里提到它是因为上面3的答案也可能很好地解决了4。)

答案1

小编典典

当您尝试multiprocessing.Queue()通过传递a作为参数共享a时,听起来好像问题开始了。您可以通过创建托管队列来解决此问题:

import multiprocessingmanager = multiprocessing.Manager()passable_queue = manager.Queue()

当使用管理器创建它时,您正在存储并传递一个 代理
到队列,而不是队列本身,因此即使传递给工作进程的对象是复制的,它仍将指向相同的基础数据结构:您的队列。在概念上,它与C / C
++中的指针非常相似。如果以这种方式创建队列,则在启动工作进程时将能够通过它们。

由于您现在可以传递队列,因此不再需要管理字典。在main中保留一个普通字典,该字典将存储所有映射,并且仅为您的工作进程提供所需的队列,因此他们无需访问任何映射。

我在这里写了一个例子。看起来您在工作人员之间传递对象,这就是在这里完成的工作。假设我们有两个处理阶段,并且数据在的控制下开始和结束main。看看我们如何创建像管道一样连接工人的队列,但是通过
给他们提供 他们需要的队列 ,就不需要他们知道任何映射:

import multiprocessing as mpdef stage1(q_in, q_out):    q_out.put(q_in.get()+"Stage 1 did some work.\n")    returndef stage2(q_in, q_out):    q_out.put(q_in.get()+"Stage 2 did some work.\n")    returndef main():    pool = mp.Pool()    manager = mp.Manager()    # create managed queues    q_main_to_s1 = manager.Queue()    q_s1_to_s2 = manager.Queue()    q_s2_to_main = manager.Queue()    # launch workers, passing them the queues they need    results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))    results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))    # Send a message into the pipeline    q_main_to_s1.put("Main started the job.\n")    # Wait for work to complete    print(q_s2_to_main.get()+"Main finished the job.")    pool.close()    pool.join()    returnif __name__ == "__main__":    main()

代码产生以下输出:

Main开始了工作。
第一阶段做了一些工作。
第二阶段做了一些工作。
Main完成了工作。

我没有提供AsyncResults在字典中存储队列或对象的示例,因为我仍然不太了解您的程序应该如何工作。但是,既然您可以自由地传递队列,则可以构建字典来根据需要存储队列/进程映射。

实际上,如果您确实在多个工作人员之间建立了一条管道,那么您甚至不需要保留对中“工作人员之间”队列的引用main。创建队列,将其传递给您的工作人员,然后仅保留对main将要使用的队列的引用。如果您确实有“任意数量”的队列,我绝对建议您尝试尽快让旧队列被垃圾回收。

linux – 在不相关的进程之间共享futex

linux – 在不相关的进程之间共享futex

不相关的流程如何使用futex进行合作?

假设我有不相关的进程,例如,一个是带有我的模块的apache子进程,另一个例如是后台脚本.

我想使用futex在两者之间建立一个带有互斥的条件变量,以便为用户空间快速代码路径带来好处.

在我看来,存储互斥锁的存储器可以在mmap的文件中,如果该存储器被映射,mlock’d这两个进程理论上可以针对相同的地址发出futex调用.

或者,也许可以使用FUTEX_FD将futex从一个进程传递到另一个进程.

代码提交接受低级,高级和动态语言(C,C,Python等).还必须支持“robust futex”API.

参考文献:

> https://www.kernel.org/doc/Documentation/robust-futexes.txt
> http://locklessinc.com/articles/mutex_cv_futex/

最佳答案
感谢Phillip和Felix M.指针.

Python用户代码(已存在数据结构的文件,使用PTHREAD_PROCESS_SHARED初始化)

with open("/tmp/semaphore","rb+") as f:
    m = mmap.mmap(f.fileno(),0)  # default: all file,share,read-write

data = ffi.cast("unsigned long[3]",id(m))[2]  # pointer to mapped area,64-bit cpython
lock = ffi.cast("pthread_mutex_t *",data)
cond = ffi.cast("pthread_cond_t *",data + 40)

@contextlib.contextmanager
def locked(alock):
    assert not C.pthread_mutex_lock(alock)
    try:
        yield
    finally:
        assert not C.pthread_mutex_unlock(alock)

等等醒来​​:

if "wait" in sys.argv:
    with locked(lock):
        assert not C.pthread_cond_wait(cond,lock)

elif "signal" in sys.argv:
    with locked(lock):
        assert not C.pthread_cond_signal(cond)

设置PTHREAD_PROCESS_SHARED的基础知识:

l = ffi.new("pthread_mutexattr_t *")
assert not C.pthread_mutexattr_init(l)
assert not C.pthread_mutexattr_setpshared(l,1)  # PTHREAD_PROCESS_SHARED
assert not C.pthread_mutex_init(lock,l)
# same for condition variable

针对nitpicks的完整代码:-) https://github.com/dimaqq/pthread_mutexattr_init/blob/master/xsem.py基于http://linux.die.net/man/3/pthread_mutexattr_init

python 30 进程之间的相互独立、进程之间的时间差

python 30 进程之间的相互独立、进程之间的时间差

# import time
# import os
# from multiprocessing import Process
#
# def func1():
# time.sleep(3)
# print(''我是func1'')
#
# def func2():
# time.sleep(3)
# print(''我是func2'')
# print(''子进程的pid'', os.getpid())
# print(''子进程的父进程'', os.getppid())
#
# start_time = time.time()
# func1()
# func2()
# end_time = time.time()
# print(''时间差>>>>'', end_time-start_time)
#
# if __name__ == ''__main__'':
# print(''主进程的pid'', os.getpid())
# start_time1 = time.time()
# p = Process(target=func2) # 创建一个func2的新进程、与func1同时进行(那个进程先结束就先显示)
# p.start()
#
# func1()
# end_time1 = time.time() # 由于你的程序是上往下走的在发起func2这个进程是func1已经走完
# print(''时间差>>>>'', end_time1 - start_time1) # 因此这个时间差实际上只是主程序运行的时间并没有考虑func2的运行时间
# **************************************分割线***************************************
# print(''分割线''.center(80, ''*''))

#真正的func1、func2并发的时间:由于func1/func2是同事进行的,算两个进程中运行的最长时间就是func1、func2的时间

# 两个进程的真正时间差

# from multiprocessing import Process
# import time
#
# def func1():
# time.sleep(1)
# print(''我是func1'')
# time1 = time.time()
# print(''我是时间1>>>'', time1)
#
#
# def func2():
# time.sleep(3)
# print(''我是func2'')
# time2 = time.time()
# print(''我是时间2>>>'', time2)
#
#
# if __name__ == ''__main__'':
# print(''我是主程序'')
# p = Process(target=func2)
# p.start()
#
# func1()
# end_time = time.time()
# func1、func2真正的时间差是打印出来的时间1 - 打印出来的时间2

# **************************************分割线***************************************
# 子进程和主进程在空间上是相互独立的
# from multiprocessing import Process
#
# x = 100
#
# def func():
# global x
# x = 10
# print(id(x))
# print(''子进程>>>'', x)
#
# if __name__ == ''__main__'':
#
# p = Process(target=func)
# p.start()
# p.join()
# print(id(x))
# print(''主进程>>>'', x)
# # 结果:
# # 263469456
# # 子进程>>> 10
# # 263470896
# # 主进程>>> 100 # 内存地址是相互独立的 且值是相互独立的

python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存

python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存

这是一个“陷阱”,它为外行人带来了很多惊喜。问题在于,当您拥有托管字典时,要查看传播的更新,您需要更改键或键的值。在这里,从技术上讲,您没有更改该值,也就是说,您仍在引用相同的对象实例(类型 ExampleClass),并且只更改了该引用中 的某些内容。很奇怪,我知道。这是您需要的修改后的方法 f

def f(self,dict):
    # generate a random index to add the class to
    index = str(random.randint(0,100))

    # create a new class at that index
    dict[index] = ExampleClass(str(random.randint(100,200)))

    # this is the problem,it doesn't share the updated variables in the dictionary between the processes <----------------------
    # attempt to change the created variables
    ec = dict[index]
    ec.count += 1
    ec.stringVar = "yeAH"
    dict[index] = ec # show new reference
    # print what's inside
    for x in dict.values():
        print(x.count,x.stringVar)

注意:

如果您使用以下代码来设置密钥/对值,以下内容实际上会打印 False

ec = ExampleClass(str(random.randint(100,200)))
dict[index] = ec
print(dict[index] is ec)

这就是为什么在修改后的方法 f 中,dict[index] = ec # show new reference 似乎是一个被设置为值的新引用。

此外,您应该考虑不使用 dict(一种内置数据类型)作为变量名。

python – 在进程对象之间共享SciPy稀疏数组

python – 在进程对象之间共享SciPy稀疏数组

我最近一直在学习 Python多处理,并遇到了障碍.我有一个稀疏的SciPy数组(CSC格式),我需要在5个工作进程之间以只读格式共享.我读过 this和 this(numpy-shared),但这似乎只适用于密集类型.

如何在不复制(或使用最少复制)5个多处理过程对象的情况下共享scipy.sparse.csc_matrix()?即使是numpy-shared方法似乎也需要复制整个数组,即便如此,我也不能将scipy.sparse转换为mp.Array().任何人都可以帮我指出正确的方向吗?

谢谢!

解决方法

我无法帮助您处理问题的多处理部分,但CSC稀疏矩阵只是三个numpy数组.您可以实例化另一个稀疏矩阵b,通过执行以下操作来共享与稀疏矩阵相同的内存对象:
import scipy.sparse as sps

b = sps.csc_matrix((a.data,a.indices,a.indptr),shape=a.shape,copy=False)

a.data,a.indices和a.indptr是你想要在你的进程之间共享的三个numpy数组,如果你能这样做,那么在每个进程中实例化一个稀疏矩阵将是一个廉价的操作.

关于在Python中的进程之间共享许多队列在python中的进程之间共享许多队列的介绍已经告一段落,感谢您的耐心阅读,如果想了解更多关于linux – 在不相关的进程之间共享futex、python 30 进程之间的相互独立、进程之间的时间差、python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存、python – 在进程对象之间共享SciPy稀疏数组的相关信息,请在本站寻找。

本文标签: