对于想了解在Python中的进程之间共享许多队列的读者,本文将提供新的信息,我们将详细介绍在python中的进程之间共享许多队列,并且为您提供关于linux–在不相关的进程之间共享futex、pyth
对于想了解在Python中的进程之间共享许多队列的读者,本文将提供新的信息,我们将详细介绍在python中的进程之间共享许多队列,并且为您提供关于linux – 在不相关的进程之间共享futex、python 30 进程之间的相互独立、进程之间的时间差、python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存、python – 在进程对象之间共享SciPy稀疏数组的有价值信息。
本文目录一览:- 在Python中的进程之间共享许多队列(在python中的进程之间共享许多队列)
- linux – 在不相关的进程之间共享futex
- python 30 进程之间的相互独立、进程之间的时间差
- python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存
- python – 在进程对象之间共享SciPy稀疏数组
在Python中的进程之间共享许多队列(在python中的进程之间共享许多队列)
我知道multiprocessing.Manager()
如何将其用于创建共享对象,尤其是可以在工作人员之间共享的队列。有这个问题,这个问题,[这个问题](http://codingdict.com/questions/1299甚至是我自己的一个问题。
但是,我需要定义很多队列,每个队列都链接一对特定的进程。假设每对进程及其链接队列均由变量标识key
。
当我需要放置和获取数据时,我想使用字典来访问我的队列。我无法完成这项工作。我已经尝试了很多东西。随着multiprocessing
进口为mp
:
像for key in all_keys: DICT[key] =
mp.Queue
在多处理模块(称为multi.py
)导入的配置文件中那样定义dict不会返回错误,但是DICT[key]
在进程之间不共享队列,每个进程似乎都有自己的队列副本,因此不会发生通信。
如果我尝试在定义DICT
进程并启动它们的主要多处理函数的开始处定义,例如
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Queue()
我得到错误
RuntimeError: Queue objects should only be shared between processes through
inheritance
更改为
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Manager().Queue()
只会使一切变得更糟。multi.py
在main函数的顶部而不是内部尝试类似的定义会返回类似的错误。
必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列。有任何想法吗?
编辑
这是程序的基本架构:
1-加载第一个模块,该模块定义一些变量,import
multi
,launchsmulti.main()
和加载另一个模块,该模块开始一系列模块加载和代码执行。与此同时…
2-multi.main
看起来像这样:
def main():
manager = mp.Manager()
pool = mp.Pool()
DICT2 = manager.dict()
for key in all_keys:
DICT2[key] = manager.Queue()
proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
proc_2 = pool.apply_async(targ2,(DICT2[key],otherargs,)
除了使用pool
and之外manager
,我还启动了以下程序:
mp.Process(target=targ1,args=(DICT[key],))
3-该函数targ1
接收key
来自主进程的输入数据(按排序)。它旨在将结果传递给它,DICT[key]
以便targ2
可以执行其工作。这是不起作用的部分。有任意数量的targ1
s,targ2
s等,因此有任意数量的队列。
4-这些过程中的某些过程的结果将被发送到一堆不同的数组/熊猫数据帧,这些数据帧也由索引key
,我希望可以从任意过程中访问它们,甚至是在不同模块中启动的过程。我还没有写这部分,这可能是一个不同的问题。(我在这里提到它是因为上面3的答案也可能很好地解决了4。)
linux – 在不相关的进程之间共享futex
不相关的流程如何使用futex进行合作?
假设我有不相关的进程,例如,一个是带有我的模块的apache子进程,另一个例如是后台脚本.
我想使用futex在两者之间建立一个带有互斥的条件变量,以便为用户空间快速代码路径带来好处.
在我看来,存储互斥锁的存储器可以在mmap的文件中,如果该存储器被映射,mlock’d这两个进程理论上可以针对相同的地址发出futex调用.
或者,也许可以使用FUTEX_FD将futex从一个进程传递到另一个进程.
代码提交接受低级,高级和动态语言(C,C,Python等).还必须支持“robust futex”API.
参考文献:
> https://www.kernel.org/doc/Documentation/robust-futexes.txt
> http://locklessinc.com/articles/mutex_cv_futex/
Python用户代码(已存在数据结构的文件,使用PTHREAD_PROCESS_SHARED初始化)
with open("/tmp/semaphore","rb+") as f:
m = mmap.mmap(f.fileno(),0) # default: all file,share,read-write
data = ffi.cast("unsigned long[3]",id(m))[2] # pointer to mapped area,64-bit cpython
lock = ffi.cast("pthread_mutex_t *",data)
cond = ffi.cast("pthread_cond_t *",data + 40)
@contextlib.contextmanager
def locked(alock):
assert not C.pthread_mutex_lock(alock)
try:
yield
finally:
assert not C.pthread_mutex_unlock(alock)
等等醒来:
if "wait" in sys.argv:
with locked(lock):
assert not C.pthread_cond_wait(cond,lock)
elif "signal" in sys.argv:
with locked(lock):
assert not C.pthread_cond_signal(cond)
设置PTHREAD_PROCESS_SHARED的基础知识:
l = ffi.new("pthread_mutexattr_t *")
assert not C.pthread_mutexattr_init(l)
assert not C.pthread_mutexattr_setpshared(l,1) # PTHREAD_PROCESS_SHARED
assert not C.pthread_mutex_init(lock,l)
# same for condition variable
针对nitpicks的完整代码:-) https://github.com/dimaqq/pthread_mutexattr_init/blob/master/xsem.py基于http://linux.die.net/man/3/pthread_mutexattr_init
python 30 进程之间的相互独立、进程之间的时间差
# import time
# import os
# from multiprocessing import Process
#
# def func1():
# time.sleep(3)
# print(''我是func1'')
#
# def func2():
# time.sleep(3)
# print(''我是func2'')
# print(''子进程的pid'', os.getpid())
# print(''子进程的父进程'', os.getppid())
#
# start_time = time.time()
# func1()
# func2()
# end_time = time.time()
# print(''时间差>>>>'', end_time-start_time)
#
# if __name__ == ''__main__'':
# print(''主进程的pid'', os.getpid())
# start_time1 = time.time()
# p = Process(target=func2) # 创建一个func2的新进程、与func1同时进行(那个进程先结束就先显示)
# p.start()
#
# func1()
# end_time1 = time.time() # 由于你的程序是上往下走的在发起func2这个进程是func1已经走完
# print(''时间差>>>>'', end_time1 - start_time1) # 因此这个时间差实际上只是主程序运行的时间并没有考虑func2的运行时间
# **************************************分割线***************************************
# print(''分割线''.center(80, ''*''))
#真正的func1、func2并发的时间:由于func1/func2是同事进行的,算两个进程中运行的最长时间就是func1、func2的时间
# 两个进程的真正时间差
# from multiprocessing import Process
# import time
#
# def func1():
# time.sleep(1)
# print(''我是func1'')
# time1 = time.time()
# print(''我是时间1>>>'', time1)
#
#
# def func2():
# time.sleep(3)
# print(''我是func2'')
# time2 = time.time()
# print(''我是时间2>>>'', time2)
#
#
# if __name__ == ''__main__'':
# print(''我是主程序'')
# p = Process(target=func2)
# p.start()
#
# func1()
# end_time = time.time()
# func1、func2真正的时间差是打印出来的时间1 - 打印出来的时间2
# **************************************分割线***************************************
# 子进程和主进程在空间上是相互独立的
# from multiprocessing import Process
#
# x = 100
#
# def func():
# global x
# x = 10
# print(id(x))
# print(''子进程>>>'', x)
#
# if __name__ == ''__main__'':
#
# p = Process(target=func)
# p.start()
# p.join()
# print(id(x))
# print(''主进程>>>'', x)
# # 结果:
# # 263469456
# # 子进程>>> 10
# # 263470896
# # 主进程>>> 100 # 内存地址是相互独立的 且值是相互独立的
python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存
这是一个“陷阱”,它为外行人带来了很多惊喜。问题在于,当您拥有托管字典时,要查看传播的更新,您需要更改键或键的值。在这里,从技术上讲,您没有更改该值,也就是说,您仍在引用相同的对象实例(类型 ExampleClass
),并且只更改了该引用中 的某些内容。很奇怪,我知道。这是您需要的修改后的方法 f
:
def f(self,dict):
# generate a random index to add the class to
index = str(random.randint(0,100))
# create a new class at that index
dict[index] = ExampleClass(str(random.randint(100,200)))
# this is the problem,it doesn't share the updated variables in the dictionary between the processes <----------------------
# attempt to change the created variables
ec = dict[index]
ec.count += 1
ec.stringVar = "yeAH"
dict[index] = ec # show new reference
# print what's inside
for x in dict.values():
print(x.count,x.stringVar)
注意:
如果您使用以下代码来设置密钥/对值,以下内容实际上会打印 False
:
ec = ExampleClass(str(random.randint(100,200)))
dict[index] = ec
print(dict[index] is ec)
这就是为什么在修改后的方法 f
中,dict[index] = ec # show new reference
似乎是一个被设置为值的新引用。
此外,您应该考虑不使用 dict
(一种内置数据类型)作为变量名。
python – 在进程对象之间共享SciPy稀疏数组
如何在不复制(或使用最少复制)5个多处理过程对象的情况下共享scipy.sparse.csc_matrix()?即使是numpy-shared方法似乎也需要复制整个数组,即便如此,我也不能将scipy.sparse转换为mp.Array().任何人都可以帮我指出正确的方向吗?
谢谢!
解决方法
import scipy.sparse as sps b = sps.csc_matrix((a.data,a.indices,a.indptr),shape=a.shape,copy=False)
a.data,a.indices和a.indptr是你想要在你的进程之间共享的三个numpy数组,如果你能这样做,那么在每个进程中实例化一个稀疏矩阵将是一个廉价的操作.
今天关于在Python中的进程之间共享许多队列和在python中的进程之间共享许多队列的介绍到此结束,谢谢您的阅读,有关linux – 在不相关的进程之间共享futex、python 30 进程之间的相互独立、进程之间的时间差、python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存、python – 在进程对象之间共享SciPy稀疏数组等更多相关知识的信息可以在本站进行查询。
本文标签: