GVKun编程网logo

在Python中的进程之间共享许多队列(在python中的进程之间共享许多队列)

15

对于想了解在Python中的进程之间共享许多队列的读者,本文将提供新的信息,我们将详细介绍在python中的进程之间共享许多队列,并且为您提供关于linux–在不相关的进程之间共享futex、pyth

对于想了解在Python中的进程之间共享许多队列的读者,本文将提供新的信息,我们将详细介绍在python中的进程之间共享许多队列,并且为您提供关于linux – 在不相关的进程之间共享futex、python 30 进程之间的相互独立、进程之间的时间差、python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存、python – 在进程对象之间共享SciPy稀疏数组的有价值信息。

本文目录一览:

在Python中的进程之间共享许多队列(在python中的进程之间共享许多队列)

在Python中的进程之间共享许多队列(在python中的进程之间共享许多队列)

我知道multiprocessing.Manager()如何将其用于创建共享对象,尤其是可以在工作人员之间共享的队列。有这个问题,这个问题,[这个问题](http://codingdict.com/questions/1299甚至是我自己的一个问题。

但是,我需要定义很多队列,每个队列都链接一对特定的进程。假设每对进程及其链接队列均由变量标识key

当我需要放置和获取数据时,我想使用字典来访问我的队列。我无法完成这项工作。我已经尝试了很多东西。随着multiprocessing进口为mp

for key in all_keys: DICT[key] = mp.Queue在多处理模块(称为multi.py)导入的配置文件中那样定义dict不会返回错误,但是DICT[key]在进程之间不共享队列,每个进程似乎都有自己的队列副本,因此不会发生通信。

如果我尝试在定义DICT进程并启动它们的主要多处理函数的开始处定义,例如

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Queue()

我得到错误

RuntimeError: Queue objects should only be shared between processes through
 inheritance

更改为

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Manager().Queue()

只会使一切变得更糟。multi.py在main函数的顶部而不是内部尝试类似的定义会返回类似的错误。

必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列。有任何想法吗?

编辑

这是程序的基本架构:

1-加载第一个模块,该模块定义一些变量,import
multi,launchsmulti.main()和加载另一个模块,该模块开始一系列模块加载和代码执行。与此同时…

2-multi.main看起来像这样:

def main():
    manager = mp.Manager()
    pool = mp.Pool()
    DICT2 = manager.dict()

    for key in all_keys:
        DICT2[key] = manager.Queue()
        proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
        proc_2 =  pool.apply_async(targ2,(DICT2[key],otherargs,)

除了使用pooland之外manager,我还启动了以下程序:

mp.Process(target=targ1,args=(DICT[key],))

3-该函数targ1接收key来自主进程的输入数据(按排序)。它旨在将结果传递给它,DICT[key]以便targ2可以执行其工作。这是不起作用的部分。有任意数量的targ1s,targ2s等,因此有任意数量的队列。

4-这些过程中的某些过程的结果将被发送到一堆不同的数组/熊猫数据帧,这些数据帧也由索引key,我希望可以从任意过程中访问它们,甚至是在不同模块中启动的过程。我还没有写这部分,这可能是一个不同的问题。(我在这里提到它是因为上面3的答案也可能很好地解决了4。)

linux – 在不相关的进程之间共享futex

linux – 在不相关的进程之间共享futex

不相关的流程如何使用futex进行合作?

假设我有不相关的进程,例如,一个是带有我的模块的apache子进程,另一个例如是后台脚本.

我想使用futex在两者之间建立一个带有互斥的条件变量,以便为用户空间快速代码路径带来好处.

在我看来,存储互斥锁的存储器可以在mmap的文件中,如果该存储器被映射,mlock’d这两个进程理论上可以针对相同的地址发出futex调用.

或者,也许可以使用FUTEX_FD将futex从一个进程传递到另一个进程.

代码提交接受低级,高级和动态语言(C,C,Python等).还必须支持“robust futex”API.

参考文献:

> https://www.kernel.org/doc/Documentation/robust-futexes.txt
> http://locklessinc.com/articles/mutex_cv_futex/

最佳答案
感谢Phillip和Felix M.指针.

Python用户代码(已存在数据结构的文件,使用PTHREAD_PROCESS_SHARED初始化)

with open("/tmp/semaphore","rb+") as f:
    m = mmap.mmap(f.fileno(),0)  # default: all file,share,read-write

data = ffi.cast("unsigned long[3]",id(m))[2]  # pointer to mapped area,64-bit cpython
lock = ffi.cast("pthread_mutex_t *",data)
cond = ffi.cast("pthread_cond_t *",data + 40)

@contextlib.contextmanager
def locked(alock):
    assert not C.pthread_mutex_lock(alock)
    try:
        yield
    finally:
        assert not C.pthread_mutex_unlock(alock)

等等醒来​​:

if "wait" in sys.argv:
    with locked(lock):
        assert not C.pthread_cond_wait(cond,lock)

elif "signal" in sys.argv:
    with locked(lock):
        assert not C.pthread_cond_signal(cond)

设置PTHREAD_PROCESS_SHARED的基础知识:

l = ffi.new("pthread_mutexattr_t *")
assert not C.pthread_mutexattr_init(l)
assert not C.pthread_mutexattr_setpshared(l,1)  # PTHREAD_PROCESS_SHARED
assert not C.pthread_mutex_init(lock,l)
# same for condition variable

针对nitpicks的完整代码:-) https://github.com/dimaqq/pthread_mutexattr_init/blob/master/xsem.py基于http://linux.die.net/man/3/pthread_mutexattr_init

python 30 进程之间的相互独立、进程之间的时间差

python 30 进程之间的相互独立、进程之间的时间差

# import time
# import os
# from multiprocessing import Process
#
# def func1():
# time.sleep(3)
# print(''我是func1'')
#
# def func2():
# time.sleep(3)
# print(''我是func2'')
# print(''子进程的pid'', os.getpid())
# print(''子进程的父进程'', os.getppid())
#
# start_time = time.time()
# func1()
# func2()
# end_time = time.time()
# print(''时间差>>>>'', end_time-start_time)
#
# if __name__ == ''__main__'':
# print(''主进程的pid'', os.getpid())
# start_time1 = time.time()
# p = Process(target=func2) # 创建一个func2的新进程、与func1同时进行(那个进程先结束就先显示)
# p.start()
#
# func1()
# end_time1 = time.time() # 由于你的程序是上往下走的在发起func2这个进程是func1已经走完
# print(''时间差>>>>'', end_time1 - start_time1) # 因此这个时间差实际上只是主程序运行的时间并没有考虑func2的运行时间
# **************************************分割线***************************************
# print(''分割线''.center(80, ''*''))

#真正的func1、func2并发的时间:由于func1/func2是同事进行的,算两个进程中运行的最长时间就是func1、func2的时间

# 两个进程的真正时间差

# from multiprocessing import Process
# import time
#
# def func1():
# time.sleep(1)
# print(''我是func1'')
# time1 = time.time()
# print(''我是时间1>>>'', time1)
#
#
# def func2():
# time.sleep(3)
# print(''我是func2'')
# time2 = time.time()
# print(''我是时间2>>>'', time2)
#
#
# if __name__ == ''__main__'':
# print(''我是主程序'')
# p = Process(target=func2)
# p.start()
#
# func1()
# end_time = time.time()
# func1、func2真正的时间差是打印出来的时间1 - 打印出来的时间2

# **************************************分割线***************************************
# 子进程和主进程在空间上是相互独立的
# from multiprocessing import Process
#
# x = 100
#
# def func():
# global x
# x = 10
# print(id(x))
# print(''子进程>>>'', x)
#
# if __name__ == ''__main__'':
#
# p = Process(target=func)
# p.start()
# p.join()
# print(id(x))
# print(''主进程>>>'', x)
# # 结果:
# # 263469456
# # 子进程>>> 10
# # 263470896
# # 主进程>>> 100 # 内存地址是相互独立的 且值是相互独立的

python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存

python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存

这是一个“陷阱”,它为外行人带来了很多惊喜。问题在于,当您拥有托管字典时,要查看传播的更新,您需要更改键或键的值。在这里,从技术上讲,您没有更改该值,也就是说,您仍在引用相同的对象实例(类型 ExampleClass),并且只更改了该引用中 的某些内容。很奇怪,我知道。这是您需要的修改后的方法 f

def f(self,dict):
    # generate a random index to add the class to
    index = str(random.randint(0,100))

    # create a new class at that index
    dict[index] = ExampleClass(str(random.randint(100,200)))

    # this is the problem,it doesn't share the updated variables in the dictionary between the processes <----------------------
    # attempt to change the created variables
    ec = dict[index]
    ec.count += 1
    ec.stringVar = "yeAH"
    dict[index] = ec # show new reference
    # print what's inside
    for x in dict.values():
        print(x.count,x.stringVar)

注意:

如果您使用以下代码来设置密钥/对值,以下内容实际上会打印 False

ec = ExampleClass(str(random.randint(100,200)))
dict[index] = ec
print(dict[index] is ec)

这就是为什么在修改后的方法 f 中,dict[index] = ec # show new reference 似乎是一个被设置为值的新引用。

此外,您应该考虑不使用 dict(一种内置数据类型)作为变量名。

python – 在进程对象之间共享SciPy稀疏数组

python – 在进程对象之间共享SciPy稀疏数组

我最近一直在学习 Python多处理,并遇到了障碍.我有一个稀疏的SciPy数组(CSC格式),我需要在5个工作进程之间以只读格式共享.我读过 this和 this(numpy-shared),但这似乎只适用于密集类型.

如何在不复制(或使用最少复制)5个多处理过程对象的情况下共享scipy.sparse.csc_matrix()?即使是numpy-shared方法似乎也需要复制整个数组,即便如此,我也不能将scipy.sparse转换为mp.Array().任何人都可以帮我指出正确的方向吗?

谢谢!

解决方法

我无法帮助您处理问题的多处理部分,但CSC稀疏矩阵只是三个numpy数组.您可以实例化另一个稀疏矩阵b,通过执行以下操作来共享与稀疏矩阵相同的内存对象:
import scipy.sparse as sps

b = sps.csc_matrix((a.data,a.indices,a.indptr),shape=a.shape,copy=False)

a.data,a.indices和a.indptr是你想要在你的进程之间共享的三个numpy数组,如果你能这样做,那么在每个进程中实例化一个稀疏矩阵将是一个廉价的操作.

今天关于在Python中的进程之间共享许多队列在python中的进程之间共享许多队列的介绍到此结束,谢谢您的阅读,有关linux – 在不相关的进程之间共享futex、python 30 进程之间的相互独立、进程之间的时间差、python multiprocessing - 在进程之间共享类字典,随后从进程写入反映到共享内存、python – 在进程对象之间共享SciPy稀疏数组等更多相关知识的信息可以在本站进行查询。

本文标签: