本文将介绍在Python中的进程之间共享许多队列的详细情况,特别是关于在python中的进程之间共享许多队列的相关信息。我们将通过案例分析、数据研究等多种方式,帮助您更全面地了解这个主题,同时也将涉及
本文将介绍在Python中的进程之间共享许多队列的详细情况,特别是关于在python中的进程之间共享许多队列的相关信息。我们将通过案例分析、数据研究等多种方式,帮助您更全面地了解这个主题,同时也将涉及一些关于linux – 在不相关的进程之间共享futex、python 30 进程之间的相互独立、进程之间的时间差、python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存、python – 在进程对象之间共享SciPy稀疏数组的知识。
本文目录一览:- 在Python中的进程之间共享许多队列(在python中的进程之间共享许多队列)
- linux – 在不相关的进程之间共享futex
- python 30 进程之间的相互独立、进程之间的时间差
- python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存
- python – 在进程对象之间共享SciPy稀疏数组
在Python中的进程之间共享许多队列(在python中的进程之间共享许多队列)
我知道multiprocessing.Manager()
如何将其用于创建共享对象,尤其是可以在工作人员之间共享的队列。有这个问题,这个问题,[这个问题](http://codingdict.com/questions/1299甚至是我自己的一个问题。
但是,我需要定义很多队列,每个队列都链接一对特定的进程。假设每对进程及其链接队列均由变量标识key
。
当我需要放置和获取数据时,我想使用字典来访问我的队列。我无法完成这项工作。我已经尝试了很多东西。随着multiprocessing
进口为mp
:
像for key in all_keys: DICT[key] =mp.Queue
在多处理模块(称为multi.py
)导入的配置文件中那样定义dict不会返回错误,但是DICT[key]
在进程之间不共享队列,每个进程似乎都有自己的队列副本,因此不会发生通信。
如果我尝试在定义DICT
进程并启动它们的主要多处理函数的开始处定义,例如
DICT = mp.Manager().dict() for key in all_keys: DICT[key] = mp.Queue()
我得到错误
RuntimeError: Queue objects should only be shared between processes through inheritance
更改为
DICT = mp.Manager().dict() for key in all_keys: DICT[key] = mp.Manager().Queue()
只会使一切变得更糟。multi.py
在main函数的顶部而不是内部尝试类似的定义会返回类似的错误。
必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列。有任何想法吗?
编辑
这是程序的基本架构:
1-加载第一个模块,该模块定义一些变量,importmulti
,launchsmulti.main()
和加载另一个模块,该模块开始一系列模块加载和代码执行。与此同时…
2-multi.main
看起来像这样:
def main(): manager = mp.Manager() pool = mp.Pool() DICT2 = manager.dict() for key in all_keys: DICT2[key] = manager.Queue() proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file proc_2 = pool.apply_async(targ2,(DICT2[key], otherargs,)
除了使用pool
and之外manager
,我还启动了以下程序:
mp.Process(target=targ1, args=(DICT[key],))
3-该函数targ1
接收key
来自主进程的输入数据(按排序)。它旨在将结果传递给它,DICT[key]
以便targ2
可以执行其工作。这是不起作用的部分。有任意数量的targ1
s,targ2
s等,因此有任意数量的队列。
4-这些过程中的某些过程的结果将被发送到一堆不同的数组/熊猫数据帧,这些数据帧也由索引key
,我希望可以从任意过程中访问它们,甚至是在不同模块中启动的过程。我还没有写这部分,这可能是一个不同的问题。(我在这里提到它是因为上面3的答案也可能很好地解决了4。)
答案1
小编典典当您尝试multiprocessing.Queue()
通过传递a作为参数共享a时,听起来好像问题开始了。您可以通过创建托管队列来解决此问题:
import multiprocessingmanager = multiprocessing.Manager()passable_queue = manager.Queue()
当使用管理器创建它时,您正在存储并传递一个 代理
到队列,而不是队列本身,因此即使传递给工作进程的对象是复制的,它仍将指向相同的基础数据结构:您的队列。在概念上,它与C / C
++中的指针非常相似。如果以这种方式创建队列,则在启动工作进程时将能够通过它们。
由于您现在可以传递队列,因此不再需要管理字典。在main中保留一个普通字典,该字典将存储所有映射,并且仅为您的工作进程提供所需的队列,因此他们无需访问任何映射。
我在这里写了一个例子。看起来您在工作人员之间传递对象,这就是在这里完成的工作。假设我们有两个处理阶段,并且数据在的控制下开始和结束main
。看看我们如何创建像管道一样连接工人的队列,但是通过
只 给他们提供 他们需要的队列 ,就不需要他们知道任何映射:
import multiprocessing as mpdef stage1(q_in, q_out): q_out.put(q_in.get()+"Stage 1 did some work.\n") returndef stage2(q_in, q_out): q_out.put(q_in.get()+"Stage 2 did some work.\n") returndef main(): pool = mp.Pool() manager = mp.Manager() # create managed queues q_main_to_s1 = manager.Queue() q_s1_to_s2 = manager.Queue() q_s2_to_main = manager.Queue() # launch workers, passing them the queues they need results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2)) results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main)) # Send a message into the pipeline q_main_to_s1.put("Main started the job.\n") # Wait for work to complete print(q_s2_to_main.get()+"Main finished the job.") pool.close() pool.join() returnif __name__ == "__main__": main()
代码产生以下输出:
Main开始了工作。
第一阶段做了一些工作。
第二阶段做了一些工作。
Main完成了工作。
我没有提供AsyncResults
在字典中存储队列或对象的示例,因为我仍然不太了解您的程序应该如何工作。但是,既然您可以自由地传递队列,则可以构建字典来根据需要存储队列/进程映射。
实际上,如果您确实在多个工作人员之间建立了一条管道,那么您甚至不需要保留对中“工作人员之间”队列的引用main
。创建队列,将其传递给您的工作人员,然后仅保留对main
将要使用的队列的引用。如果您确实有“任意数量”的队列,我绝对建议您尝试尽快让旧队列被垃圾回收。
linux – 在不相关的进程之间共享futex
不相关的流程如何使用futex进行合作?
假设我有不相关的进程,例如,一个是带有我的模块的apache子进程,另一个例如是后台脚本.
我想使用futex在两者之间建立一个带有互斥的条件变量,以便为用户空间快速代码路径带来好处.
在我看来,存储互斥锁的存储器可以在mmap的文件中,如果该存储器被映射,mlock’d这两个进程理论上可以针对相同的地址发出futex调用.
或者,也许可以使用FUTEX_FD将futex从一个进程传递到另一个进程.
代码提交接受低级,高级和动态语言(C,C,Python等).还必须支持“robust futex”API.
参考文献:
> https://www.kernel.org/doc/Documentation/robust-futexes.txt
> http://locklessinc.com/articles/mutex_cv_futex/
Python用户代码(已存在数据结构的文件,使用PTHREAD_PROCESS_SHARED初始化)
with open("/tmp/semaphore","rb+") as f:
m = mmap.mmap(f.fileno(),0) # default: all file,share,read-write
data = ffi.cast("unsigned long[3]",id(m))[2] # pointer to mapped area,64-bit cpython
lock = ffi.cast("pthread_mutex_t *",data)
cond = ffi.cast("pthread_cond_t *",data + 40)
@contextlib.contextmanager
def locked(alock):
assert not C.pthread_mutex_lock(alock)
try:
yield
finally:
assert not C.pthread_mutex_unlock(alock)
等等醒来:
if "wait" in sys.argv:
with locked(lock):
assert not C.pthread_cond_wait(cond,lock)
elif "signal" in sys.argv:
with locked(lock):
assert not C.pthread_cond_signal(cond)
设置PTHREAD_PROCESS_SHARED的基础知识:
l = ffi.new("pthread_mutexattr_t *")
assert not C.pthread_mutexattr_init(l)
assert not C.pthread_mutexattr_setpshared(l,1) # PTHREAD_PROCESS_SHARED
assert not C.pthread_mutex_init(lock,l)
# same for condition variable
针对nitpicks的完整代码:-) https://github.com/dimaqq/pthread_mutexattr_init/blob/master/xsem.py基于http://linux.die.net/man/3/pthread_mutexattr_init
python 30 进程之间的相互独立、进程之间的时间差
# import time
# import os
# from multiprocessing import Process
#
# def func1():
# time.sleep(3)
# print(''我是func1'')
#
# def func2():
# time.sleep(3)
# print(''我是func2'')
# print(''子进程的pid'', os.getpid())
# print(''子进程的父进程'', os.getppid())
#
# start_time = time.time()
# func1()
# func2()
# end_time = time.time()
# print(''时间差>>>>'', end_time-start_time)
#
# if __name__ == ''__main__'':
# print(''主进程的pid'', os.getpid())
# start_time1 = time.time()
# p = Process(target=func2) # 创建一个func2的新进程、与func1同时进行(那个进程先结束就先显示)
# p.start()
#
# func1()
# end_time1 = time.time() # 由于你的程序是上往下走的在发起func2这个进程是func1已经走完
# print(''时间差>>>>'', end_time1 - start_time1) # 因此这个时间差实际上只是主程序运行的时间并没有考虑func2的运行时间
# **************************************分割线***************************************
# print(''分割线''.center(80, ''*''))
#真正的func1、func2并发的时间:由于func1/func2是同事进行的,算两个进程中运行的最长时间就是func1、func2的时间
# 两个进程的真正时间差
# from multiprocessing import Process
# import time
#
# def func1():
# time.sleep(1)
# print(''我是func1'')
# time1 = time.time()
# print(''我是时间1>>>'', time1)
#
#
# def func2():
# time.sleep(3)
# print(''我是func2'')
# time2 = time.time()
# print(''我是时间2>>>'', time2)
#
#
# if __name__ == ''__main__'':
# print(''我是主程序'')
# p = Process(target=func2)
# p.start()
#
# func1()
# end_time = time.time()
# func1、func2真正的时间差是打印出来的时间1 - 打印出来的时间2
# **************************************分割线***************************************
# 子进程和主进程在空间上是相互独立的
# from multiprocessing import Process
#
# x = 100
#
# def func():
# global x
# x = 10
# print(id(x))
# print(''子进程>>>'', x)
#
# if __name__ == ''__main__'':
#
# p = Process(target=func)
# p.start()
# p.join()
# print(id(x))
# print(''主进程>>>'', x)
# # 结果:
# # 263469456
# # 子进程>>> 10
# # 263470896
# # 主进程>>> 100 # 内存地址是相互独立的 且值是相互独立的
python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存
这是一个“陷阱”,它为外行人带来了很多惊喜。问题在于,当您拥有托管字典时,要查看传播的更新,您需要更改键或键的值。在这里,从技术上讲,您没有更改该值,也就是说,您仍在引用相同的对象实例(类型 ExampleClass
),并且只更改了该引用中 的某些内容。很奇怪,我知道。这是您需要的修改后的方法 f
:
def f(self,dict):
# generate a random index to add the class to
index = str(random.randint(0,100))
# create a new class at that index
dict[index] = ExampleClass(str(random.randint(100,200)))
# this is the problem,it doesn't share the updated variables in the dictionary between the processes <----------------------
# attempt to change the created variables
ec = dict[index]
ec.count += 1
ec.stringVar = "yeAH"
dict[index] = ec # show new reference
# print what's inside
for x in dict.values():
print(x.count,x.stringVar)
注意:
如果您使用以下代码来设置密钥/对值,以下内容实际上会打印 False
:
ec = ExampleClass(str(random.randint(100,200)))
dict[index] = ec
print(dict[index] is ec)
这就是为什么在修改后的方法 f
中,dict[index] = ec # show new reference
似乎是一个被设置为值的新引用。
此外,您应该考虑不使用 dict
(一种内置数据类型)作为变量名。
python – 在进程对象之间共享SciPy稀疏数组
如何在不复制(或使用最少复制)5个多处理过程对象的情况下共享scipy.sparse.csc_matrix()?即使是numpy-shared方法似乎也需要复制整个数组,即便如此,我也不能将scipy.sparse转换为mp.Array().任何人都可以帮我指出正确的方向吗?
谢谢!
解决方法
import scipy.sparse as sps b = sps.csc_matrix((a.data,a.indices,a.indptr),shape=a.shape,copy=False)
a.data,a.indices和a.indptr是你想要在你的进程之间共享的三个numpy数组,如果你能这样做,那么在每个进程中实例化一个稀疏矩阵将是一个廉价的操作.
关于在Python中的进程之间共享许多队列和在python中的进程之间共享许多队列的介绍已经告一段落,感谢您的耐心阅读,如果想了解更多关于linux – 在不相关的进程之间共享futex、python 30 进程之间的相互独立、进程之间的时间差、python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存、python – 在进程对象之间共享SciPy稀疏数组的相关信息,请在本站寻找。
本文标签: