如果您对使用multiprocessing.Process并发进程数最多感兴趣,那么本文将是一篇不错的选择,我们将为您详在本文中,您将会了解到关于使用multiprocessing.Process并发
如果您对使用multiprocessing.Process并发进程数最多感兴趣,那么本文将是一篇不错的选择,我们将为您详在本文中,您将会了解到关于使用multiprocessing.Process并发进程数最多的详细内容,我们还将为您解答并发进程中用于实现进程互斥的程序段的相关问题,并且为您提供关于(并发编程)进程 (multiprocessing--Process实现进程并发)、112 Python程序中的进程操作-开启多进程(multiprocess.process)、day 32 multiprocessing 模块中 Process 方法,僵尸进程,孤儿进程,守护进程、multiprocessing 多进程模块的有价值信息。
本文目录一览:- 使用multiprocessing.Process并发进程数最多(并发进程中用于实现进程互斥的程序段)
- (并发编程)进程 (multiprocessing--Process实现进程并发)
- 112 Python程序中的进程操作-开启多进程(multiprocess.process)
- day 32 multiprocessing 模块中 Process 方法,僵尸进程,孤儿进程,守护进程
- multiprocessing 多进程模块
使用multiprocessing.Process并发进程数最多(并发进程中用于实现进程互斥的程序段)
我有Python
代码:
from multiprocessing import Process
def f(name):
print 'hello',name
if __name__ == '__main__':
for i in range(0,MAX_PROCESSES):
p = Process(target=f,args=(i,))
p.start()
运行良好。但是,MAX_PROCESSES
是变量,可以是1
和之间的任何值512
。由于我仅在具有8
内核的机器上运行此代码,因此我需要确定是否有可能限制允许同时运行的进程数。我已经调查过multiprocessing.Queue
,但看起来并不需要我的东西-
也许是我对文档的解释不正确。
有没有一种方法可以限制同时multiprocessing.Process
运行的数量?
(并发编程)进程 (multiprocessing--Process实现进程并发)
并发的本质:切换+保持状态
一、同一个程序执行多次是多个进程
每一个进程有一个PID
import os
os.getppid() #父的pid (pycharm.exe)#cmd 中 pyhon 路径 父的pid(cmd.exe)
os.getpid() #自己的pid (python.exe)
windows (createprocess) 创建子进程时子辈除了拷贝父辈的信息还创建了些自己的东西
unix (fork) 创建子进程时拷贝父辈的信息, 子进程的初始状态和父辈一致
from multiprocessing import Process
import time
def task(name):
print(''%s is running'' %name)
time.sleep(3)
print(''%s is done'' %name)
if __name__ == ''__main__'':
# 在windows系统之上,开启子进程的操作一定要放到这下面
# Process(target=task,kwargs={''name'':''egon''}) #两种传参方式皆可
p=Process(target=task,args=(''egon'',)) #两种传参方式皆可
p.start() # 向操作系统发送请求,操作系统会申请内存空间,把父进程的数据拷贝给子进程,作为子进程的初始状
print(''======主'')
第二种
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super(MyProcess,self).__init__() #Process在init里面有相应设置,要遗传下来,否则报错
self.name=name
def run(self):
print(''%s is running'' %self.name)
time.sleep(3)
print(''%s is done'' %self.name)
if __name__ == ''__main__'':
p=MyProcess(''egon'')
p.start() #p.start()调用了类中的run()方法(规定)
print(''主'')
from multiprocessing import Process
import time
x=1000
def task():
time.sleep(3)
global x
x=0
print(''儿子死啦'',x)
if __name__ == ''__main__'':
p=Process(target=task)
p.start()
time.sleep(5)
print(x)
#在子进程中对变量x的修改不影响父进程中x的值
#了解 连续start再连续join 和 连续start,join。。。。
from multiprocessing import Process
import time
x=1000
def task(n):
print(''%s is runing'' %n)
time.sleep(n)
if __name__ == ''__main__'':
start_time=time.time()
p1=Process(target=task,args=(1,))
p2=Process(target=task,args=(2,))
p3=Process(target=task,args=(3,))
p1.start()
p2.start()
p3.start()
p3.join() #3s
p1.join()
p2.join()
print(''主'',(time.time() - start_time)) #3.01637601852417
from multiprocessing import Process
import time
x=1000
def task(n):
print(''%s is runing'' %n)
time.sleep(n)
if __name__ == ''__main__'':
start_time=time.time()
p_l=[]
for i in range(1,4):
p=Process(target=task,args=(i,))
p_l.append(p)
p.start()
for p in p_l:
p.join()
print(''主'',(time.time() - start_time)) #3.0141923427581787
from multiprocessing import Process
import time
def task(n):
print(''%s is runing'' %n)
time.sleep(n)
if __name__ == ''__main__'': # 在windows系统之上,开启子进程的操作一定要放到这下面
start_time=time.time()
p1=Process(target=task,args=(1,),name=''任务1'')
p1 daemon=True #再obj发出创建前,将obj变成守护进程,主进程执行完毕后子进程跟着结束
p1.start() #向操作系统发创建子进程请求,父进程无需等待
print(p1.pid)
print(p1.name) #如前面不定义name,默认process-1 etc
p1.terminate() #向操作系统发请求,父进程无需等待
p1.join() #等待该子进程结束(卡住吧),父进程需要等待一点时间
print(p1.is_alive())
print(''主'') #这里的效果是主进程等儿子死了,再结束
import time,os
def task():
print(''self:%s parent:%s'' %(os.getpid(),os.getppid()))
time.sleep(3)
if __name__ == ''__main__'':
p1=Process(target=task,)
p1.start()
print(p1.pid)
print(''主'',os.getpid())
在unix系统中init是所有进程的爹;创建进程用fork,回收进程(结束进程的残留信息?)用waitpid
僵尸进程(有害:占用pid):子代先于父代终结,其部分信息(pid等)没有从系统中删除,需要父代回收。join中含有回收子代信息的功能。
孤儿进程(无害):父代先于子代终结,子代终结后的部分信息由init代收。
import time,os
def task(n):
print(''%s is running'' %n)
time.sleep(n)
if __name__ == ''__main__'':
1=Process(target=task,args=(1,))
p1.start()
p1.join() # join中含有回收子代信息的功能(wait)
print(''======主'',os.getpid())
七、守护进程
from multiprocessing import Process
import time
print(''%s is running'' % name)
time.sleep(3)
obj = Process(target=task, args=(''egon'',))
obj.daemon=True #将obj变成守护进程,主进程执行完毕后子进程跟着结束
obj.start() # 发送信号给操作系统
print(''主'')
强调:必须是lock.acquire()一次,然后 lock.release()释放一次,才能继续lock.acquire(),不能连续的lock.acquire()。否者程序停在原地。
抢同一把锁:生成锁对象 必须放在if __name__==''__main__'': 下面
互斥锁vs join:
大前提:二者的原理都是一样,都是将并发变成串行,从而保证有序(在多个程序共享一个资源时,为保证有序不乱,需将并发变成串行)
场景: 修改公共数据,如果并发会发出数据错乱,这时就必须串行执行,就要用到互斥锁
区别一:join是按照人为指定的顺序执行,而互斥锁是所以进程平等地竞争,谁先抢到谁执行
区别二:互斥锁可以让一部分代码串行,而join只能将代码整体串行(详见抢票系统)
理解:首先join肯定是一个子进程对象,互斥锁可以加在子进程对象代码任意部位(要释放额)
join人为安排子进程执行顺序,互斥锁是公平竞争。
from multiprocessing import Process,Lock
import time,random
print(id(mutex))
lock.acquire()
# print(''task1:名字是egon'')
# time.sleep(random.randint(1,3))
# print(''task1:性别是male'')
time.sleep(random.randint(1,3)) #with mutex: time.sleep(random.randint(1,3))
# print(''task1:年龄是18'')
lock.release()
lock.acquire()
print(''task2:名字是alex'')
time.sleep(random.randint(1,3))
print(''task2:性别是male'')
time.sleep(random.randint(1,3))
print(''task2:年龄是78'')
lock.release()
lock.acquire()
print(''task3:名字是lxx'')
time.sleep(random.randint(1,3))
print(''task3:性别是female'')
time.sleep(random.randint(1,3))
print(''task3:年龄是30'')
lock.release()
p1=Process(target=task1,args=(mutex,))
p2=Process(target=task2,args=(mutex,))
p3=Process(target=task3,args=(mutex,))
p2.start()
p3.start()
import json
import time
import random
import os
from multiprocessing import Process,Lock
time.sleep(random.randint(1,3))
with open(''db.json'',''r'',encoding=''utf-8'') as f:
dic=json.load(f)
print(''%s 剩余票数:%s'' %(os.getpid(),dic[''count'']))
with open(''db.json'',''r'',encoding=''utf-8'') as f:
dic=json.load(f)
if dic[''count''] > 0:
dic[''count'']-=1
time.sleep(random.randint(1,3))
with open(''db.json'',''w'',encoding=''utf-8'') as f:
json.dump(dic,f)
print(''%s 购票成功'' %os.getpid())
search()
lock.acquire()
get()
lock.release()
for i in range(10):
p=Process(target=task,args=(mutex,)) #这里子进程用的都是主进程这一把锁 id
p.start()
112 Python程序中的进程操作-开启多进程(multiprocess.process)
目录
- 一、multiprocess模块
- 二、multiprocess.process模块
-
三、process模块
- 3.1 方法介绍
- 3.2 属性介绍
- 3.3 在windows中使用Process类的注意事项
-
四、process类的使用
- 4.1 创建并开启子进程的两种方式
- 4.2 join方法
- 4.3 查看主进程和子进程的进程号
- 4.4 查看进程名和进程状态、设置进程名
- 4.5 terminate结束子进程
- 4.6 Process中的守护进程
-
五、socket聊天并发实例
- 5.1 使用多进程实现socket聊天并发-server端
- 5.2 使用多进程实现socket聊天并发-client端
运行中的程序就是一个进程。所有的进程都是通过它的父进程来创建的。因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程。多个进程可以实现并发效果,当我们的程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快。
在linux c语言中创建线程使用的fork函数,而Python就需要借助响应的模块
一、multiprocess模块
仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。
二、multiprocess.process模块
process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。
三、process模块
Process([group [,target [,name [,args [,kwargs]]]]])
,由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
强调:
- 需要使用关键字的方式来指定参数
- args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍:
- group参数未使用,值始终为None
- target表示调用对象,即子进程要执行的任务
- args表示调用对象的位置参数元组,
args=(1,2,‘egon‘,)
- kwargs表示调用对象的字典,
kwargs={‘name‘:‘egon‘,‘age‘:18}
- name为子进程的名称
3.1 方法介绍
-
p.start()
:启动进程,并调用该子进程中的p.run() -
p.run()
:进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 -
p.terminate()
:强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 -
p.is_alive()
:如果p仍然运行,返回True -
p.join([timeout])
:主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
3.2 属性介绍
-
p.daemon
:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()
之前设置 -
p.name
:进程的名称 -
p.pid
:进程的pid -
p.exitcode
:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) -
p.authkey
:进程的身份验证键,默认是由os.urandom()
随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
3.3 在windows中使用Process类的注意事项
在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ ==‘__main__‘
判断保护起来,import 的时候,就不会递归运行了。
四、process类的使用
在一个python进程中开启子进程,start方法和并发效果。
4.1 创建并开启子进程的两种方式
-
方式一:通过函数
from multiprocessing import Process import time ''' 开启子进程的两种方式: 1. 通过函数 2. 通过类。继承Process ''' def task(name): print("进程{%s} start"%(name)) time.sleep(2) print(f"进程{name} end") # 必须加main if __name__ == '__main__': ######## 方式1(通过函数) p = Process(target=task,args=("xc",)) # args用于传参,是个元祖,必须加逗号 p.start() # 告诉操作系统启动子进程,但一定是父进程先执行,多个子进程的执行顺序是根据操作系统调度决定的 print('主进程/父进程') print('主进程/父进程') p1 = Process(target=task,args=("cyx",)) p1.start() # 告诉操作系统启动子进程,但一定是父进程先执行,多个子进程的执行顺序是根据操作系统调度决定的 print('主进程/父进程') print('主进程/父进程') print('主进程/父进程') print('主进程/父进程')
-
方式二:通过类。继承Process
from multiprocessing import Process import time ''' 开启子进程的两种方式: 1. 通过函数 2. 通过类。继承Process ''' class myProcess(Process): def __init__(self,name): # self.name = name #错误 ### 这样没有给对象添加属性name,而是在修改父类的进程名(name) # 父类Process的进程名也是name super().__init__() # 调用父类super().init方法完成创建进程初始化,重新给name属性赋值了。 self.name = name ## 在父类的init方法后设置name,才是为自己对象添加属性 # super().__init__(name=name) # 调用父类super().init,并设置进程名(name) def run(self): # 创建进程会默认调用run方法 print("进程%s start" % (self.name)) time.sleep(2) print(f"进程{self.name} end") # 必须加main if __name__ == '__main__': p = myProcess("xc") p.start() print('主进程/父进程') print('主进程/父进程') p1 = myProcess("cyx") p1.start() # 告诉操作系统启动子进程,但一定是父进程先执行,多个子进程的执行顺序是根据操作系统调度决定的 print('主进程/父进程') print('主进程/父进程') print('主进程/父进程') print('主进程/父进程')
4.2 join方法
join方法用于回收子进程
from multiprocessing import Process import time def foo(x): print('进程 start ') time.sleep(x) print('进程 end ') if __name__ == '__main__': ### 串行执行和回收子进程 # p = Process(target=foo,args=(1,)) # p2 = Process(target=foo,args=(2,)) # p3 = Process(target=foo,args=(3,)) # p.start() # # p.join() # 阻塞住主进程再等待子进程结束,然后再往下执行,(了解的是:内部会待用wait()) # p2.start() # p2.join() # p3.start() # p3.join() # print('主') # 并发执行进程,并依次回收 p = Process(target=foo,)) p2 = Process(target=foo,)) p3 = Process(target=foo,)) # 开启进程 p.start() p2.start() p3.start() # 回收进程 p.join() # 阻塞住主进程再等待子进程结束,然后再往下执行,(了解的是:内部会待用wait()) p2.join() p3.join() print('主')
4.3 查看主进程和子进程的进程号
from multiprocessing import Process,current_process import time import os ''' 查看主进程和子进程的进程号 1. 通过os.getpid()方法 2. 通过multiprocessing模块中的current_process().pid ''' def task(name,x): print("当前进程pid:",current_process().pid) print(f"{name} start") time.sleep(x) print(f"{name} end") if __name__ == '__main__': p = Process(target=task,args=("进程1",1)) p.start() # 方式一 print("子进程pid:",p.pid) # 方式二 # print("当前进程pid:",current_process().pid) print("当前进程pid",os.getpid()) print("主进程的父进程pid",os.getppid()) # 实际上是pycharm的进程号 print()
4.4 查看进程名和进程状态、设置进程名
''' process设置名字: name属性 process判断进程是否存在:is_alive ''' from multiprocessing import Process import time def task(x): print("进程 start") time.sleep(x) print("进程 end") if __name__ == '__main__': p = Process(target=task,)) p.start() p.name = "进程1" print(p.name) print("子进程是否存在:",p.is_alive()) # True time.sleep(2) # 延时2秒等待子进程结束 print("子进程是否存在:",p.is_alive()) # False print("主进程")
4.5 terminate结束子进程
''' terminate() 告诉子进程让他结束 ''' from multiprocessing import Process import time def task(x): print("进程 start") time.sleep(x) print("进程 end") if __name__ == '__main__': p = Process(target=task,args=(10,)) p.start() p.terminate() # 告诉子进程让他提前结束 p.name = "进程1" print(p.name) print("子进程是否存在:",p.is_alive()) # True p.join() print("子进程是否存在:",p.is_alive()) # False print("主进程")
4.6 Process中的守护进程
首先,博主自己测试,实验。Process中守护进程的部分和真正的守护进程概念并不一样,因此只需要知道Process的守护进程即可。
''' daemon = True 把子进程变为守护进程 主进程的代码执行完毕守护进程直接结束。但如果子进程代码结束也会结束 ''' from multiprocessing import Process import time def task(x): print("进程 start") time.sleep(x) print("进程 end") if __name__ == '__main__': p = Process(target=task,)) p.daemon = True # 把子进程变为守护进程 p.start() # print(p.pid) p.name = "进程1" print(p.name) print("子进程pid:",p.pid) # print("子进程是否存在:",p.is_alive()) # True time.sleep(3) print("子进程是否存在:",p.is_alive()) # False print("主进程") print("子进程是否存在:",p.is_alive()) # False time.sleep(200)
五、socket聊天并发实例
5.1 使用多进程实现socket聊天并发-server端
import socket from multiprocessing import Process def talk(conn,client_addr): while 1: msg = conn.recv(1024) if not msg: break print(msg.decode("utf8")) conn.send(msg.upper()) print(111) if __name__ == '__main__': # 必须要写在里面,不然会因为创建子线程重复调用导致端口被占用 server = socket.socket(socket.AF_INET,socket.soCK_STREAM) server.bind(("127.0.0.1",8087)) server.listen(5) while 1: print("等待连接") conn,addr = server.accept() print(addr,"连接成功") p = Process(target=talk,args=(conn,addr)) p.start()
5.2 使用多进程实现socket聊天并发-client端
import socket if __name__ == '__main__': client = socket.socket() client.connect(("127.0.0.1",8087)) while 1: msg = input("请输入内容") client.send(msg.encode("utf8")) msg = client.recv(1024).decode("utf8") print(msg)
day 32 multiprocessing 模块中 Process 方法,僵尸进程,孤儿进程,守护进程
multiprocess 模块
仔细说来,multiprocess 不是一个模块而是 python 中一个操作、管理进程的包。 之所以叫 multi 是取自 multiple 的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。
进程的 pid 父进程的 ppid
import time
import os
a = 1
b = 2
print(''子进程:'',os.getpid()) #python解释器
print(''父进程:'',os.getppid()) #pycharm软件
time.sleep(20)
print(a + b)
创建进程的两种方式
#方法一
#大多数都使用方法一创建一个子进程
from multiprocessing import Process
def task(n): #task 固定写法
print("开始创建进程.............",n)
#windows环境下想开启子进程一定要 __name__ == ''__main__''
if __name__ == ''__main__'':
p = Process(target=task,args=(1,),kwargs={"n":1}) #task 固定写法,创建对象时"task"不能加"()",args表示位置参数的,kwargs表示关键字参数
p.start() #起动一个子进程
#方法2
class MyProcess(Process): #类的写法主要是直接继承"Process" 这个类.
def __init__(self,n):
super().__init__() #必须先要执行父类的 __init__属性,也就是"Process",要不然会报错
self.n = n
def run(self): #子进程的代码全部都要写在这里面
a = 1
b = 2
print("%s is runing" % self.n)
if __name__ == ''__main__'':
p = MyProcess("alex")
p.start()
print("我是主")
验证进程间的内存隔离
from multiprocessing import Process
import time
x = 1000
def task():
time.sleep(2)
global x
x = 2
print("子进程:",x)
if __name__ == ''__main__'':
p = Process(target=task,)
p.start()
time.sleep(5)
print("主:",x)
主进程在子程序运行完之后运行
from multiprocessing import Process
def task(n):
print("我是子进程开始运行了.......",n)
time.sleep(5)
print("子进程运行结束")
if __name__ == ''__main__'':
p = Process(target=task,args=(1,),name="子进程") #默认参数不变写法 args=(1,) 后面必须有逗号 name定义子进程的名字
p.start()
p.join() #join 上面的子进程运行完毕之后运行主进程
print("我是主进程")
在一个主进程开启多个子进程
def task(n):
print(''%s is begin'' % n)
time.sleep(3)
print(''%s is over'' % n)
def main():
print(''主进程执行.....'')
if __name__ == ''__main__'':
p1 = Process(target=task,args=(''p1'',))
p2 = Process(target=task,args=(''p2'',))
p3 = Process(target=task,args=(''p3'',))
p1.start() # 发送一个请求
p2.start() # 发送一个请求
p3.start() # 发送一个请求
main()
for 循环创建子进程
from multiprocessing import Process
import time
def task(n):
print("%s 开始运行" % n)
time.sleep(3)
print("%s 运行结束了" % n)
if __name__ == ''__main__'':
for i in range(1,4):
p = Process(target=task,args=("p%s" % i,))
p.start()
子进程和主进程的运行顺序 "串行" 和 "并发"
串行消耗时间长,并发消耗时间短
##串行##
from multiprocessing import Process
import time
def task(n):
print(''%s is begin'' % n)
time.sleep(3)
print(''%s is over'' % n)
def main():
print(''主进程执行.....'')
if __name__ == ''__main__'':
# 创建三个进程对象
p1 = Process(target=task, args=(''p1'',))
p2 = Process(target=task, args=(''p2'',))
p3 = Process(target=task, args=(''p3'',))
start_time = time.time()
# 发起三个请求
p1.start()
p1.join() # 高速我的主进程,你要在我执行完毕之后在运行
# print(111)
p2.start()
p2.join()
p3.start()
p3.join()
# # 如果你要按照上面的写法:串行.
print(time.time() - start_time)
main()
##并发##
from multiprocessing import Process
import time
def task(n):
print(''%s is begin'' % n)
time.sleep(n)
print(''%s is over'' % n)
def main():
print(''主进程执行.....'')
if __name__ == ''__main__'':
# 创建三个进程对象
p1 = Process(target=task, args=(1,))
p2 = Process(target=task, args=(2,))
p3 = Process(target=task, args=(3,))
start_time = time.time()
# 发起三个请求
p1.start()
p2.start()
p3.start()
p1.join() # 1
p2.join() # 2
p3.join() # 3
print(time.time() - start_time)
main()
for 循环实现主程序等待等待所有子程序结束之后再运行
from multiprocessing import Process
import time
def task(n):
print("%s开始运行" % n)
# time.sleep(3)
print("%s运行结束" % n)
if __name__ == ''__main__'':
lst = []
for i in range(1,4):
p = Process(target=task,args=("p%s" % i,))
p.start()
p.join() #这里使用join是为了让数字顺序拼接上"1,2,3","p1开始运行,p2开始运行,p3开始运行"
lst.append(p) # 把子进程添加到列表内不让在外部执行,如果不添加到列表内在外执行之后就变成了串行.
for c in lst: # 拿到列表内的子进程
c.join() #通过join执行下面的主程序
print("我是主程序")
总结:
1,多个进程使用join,他们之间互不影响.
p.join() 会将除join方法以外的方法视为主进程的方法(视为串行). 比如:p.start
进程和对象的其他参数
from multiprocessing import Process
import time
import os
def task(n):
print(''%s is begin'' % n)
# print(os.getpid())
print(''子进程:'', os.getpid(), ''主:'', os.getppid())
time.sleep(3)
print(''%s is over'' % n)
def main():
print(''主进程执行.....'')
# if __name__ == ''__main__'':
# p1 = Process(target=task, args=(''p1'',),name=''紫禁城'') # 自定制别名
# p2 = Process(target=task, args=(''p2'',))
# p1.start()
# p2.start()
# print(p1.name) # 为进程起别名
# print(p2.name)
# p1.start() # 给操作系统发送请求
# p1.terminate() # 杀死进程
# # time.sleep(3)
# # p1.join()
# time.sleep(1)
# print(p1.is_alive()) # 判断进程死活
# 判断id
# p1 = Process(target=task,args=(1,))
# p1.start()
# # print(''子进程:'',p1.pid) # 获取p1进程pid
# print(''子进程:'',p1.pid,''主:'',os.getpid())
僵尸进程和孤儿进程
僵尸进程一般针对 linux 系统,当一个主进程中的子进程在运行完毕之后,就会产生僵尸进程,但是 liux 底层有垃圾回收机制 (waitpid, 内核层面), 会默认将僵尸进程回收,如果主进程产生了大量的子进程回收不及时可能会影响的系统性能.
僵尸进程: 占有一点空间,保存 pid, 运行时间,状态,僵尸进程的回收是由主进程发起的.
孤儿进程: 当你的主进程意外挂了,就会形成孤儿进程,孤儿进程就会交给 linux 系统来处理.
守护进程
守护进程会等待主进程的代码结束之后就立即结束
守护进程也是一个子进程,主进程永远要在子进程结束之后才能使用,因为主进程要负责回收子进程
from multiprocessing import Process
import time
def task(n):
print(''%s is begin'' % n)
time.sleep(3)
print(''%s is over'' % n)
def main():
print(''主进程执行.....'')
if __name__ == ''__main__'':
# 创建三个进程对象
p1 = Process(target=task, args=(''p1'',))
p1.daemon = True # 你的p1进程就设置成了守护进程,便不会再执行p1的内容了.
p1.start()
time.sleep(2)
main()
multiprocessing 多进程模块
multiprocessing 多进程模块
前言
其实multiprocessing
模块与threading
模块的接口都非常相似,但是有一些地方有一些细微差别。所以本文是基于前面的threading
模块的一些知识对multiprocessing
模块进行讲解的。
他们的主要区别有以下几点
1. 创建子进程的方式针对不同平台有着差异化
2. 关于守护线程的设置接口是
setDaemon(True)
,而关于守护进程的接口是deamon = True
3.
multiprocessing
模块下的获取进程名与设置进程名没有threading
模块下的getName()
和setName()
,而是直接采取属性name
进行操作4. 多进程中数据共享不能使用普通的
queue
模块下提供的队列进行数据共享,而应使用multiprocessing
中提供的Queue
5.
multiprocessing
模块下中提供的Queue
先进先出队列没有task_done()
与join()
,他们都在JoinableQueue
中,并且该模块下没有提供LifoQueue
后进先出队列与PriorityQueue
优先级队列
官方中文文档
threading 模块基本使用
多进程与多线程工作的区别
多线程工作方式
多线程的工作方式实际上在第一篇的时候,我们已经说过了。因为线程必须存在于进程之中,是最小的执行单元,所以你可以将它如此理解:
其实就是不断的往进程这个小房间加人,那么它的优点如下:
开一条新的线程比开一条新的进程开销要小很多
并且对于线程的切换来说代价也要小很多
多条线程共有该进程下的所有资源,数据共享比较容易实现
而 CPython 由于 GIL 锁的设定,所以它的多线程是残缺不全的,因此在很多时候我们依然要用到多进程,虽然这种情况比较少。
多进程工作方式
其实就是不断的造出一模一样的小房间,那么它的优点如下:
虽然说,新开一条进程比新开一条线程的代价大很多,但是由于 CPython 中 GIL 锁的设定想在多线程的情况下实现并行是不可能的,只有多进程才能够实现并行。
可以说是唯一优点了,但是我们依然要学习一下multiprocessing
模块,它的学习代价并不是很大,所以接下来正式进入multiprocessing
模块的学习。
基本使用
针对不同平台的进程启动方式
对于进程启动方式来说,其实multiprocessing
模块中对于不同平台下有不同的启动方式。如下:
spawn
:这玩意儿相当于创建了一个新的解释器进程,对比其他两种方法,这种方法速度上比较慢,但是它是 Windows 平台下默认的启动方式(Unix 系统下可用)。并且在 windows 平台下,我们应该在if __name__ == ''__main__''
下进行新进程的启动。但是我依然认为不管在哪个平台下不论线程还是进程都应该在if __name__ == ''__main__''
这条语句下启动。
fork
:这种启动方式是通过os.fork()
来产生一个新的解释器分叉,是 Unix 系统的默认启动方式。
forkserver
:这个我也看不太明白,直接把官方文档搬过来。如果有懂的大神可以解释一下。程序启动并选择
forkserver
启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用os.fork()
是安全的。没有不必要的资源被继承。可在 Unix 平台上使用,支持通过 Unix 管道传递文件描述符。
import multiprocessing as mp
def foo(q):
q.put(''hello'')
if __name__ == ''__main__'': # <--- 强烈注意!在windows平台下开多进程一定要在该条语句之下,否则会抛出异常!!
mp.set_start_method(''spawn'') # 选择启动方式
q = mp.Queue() # 实例化出用于进程间数据共享的管道
p = mp.Process(target=foo, args=(q,))
p.start() # 启动进程任务,等待CPU调度执行
print(q.get()) # 从管道中拿出数据
p.join() # 阻塞至子进程运行完毕
实例化 Process 类创建子进程
其实感觉上面的方法都已经将本章要写的内容举例了一个七七八八,但是我们接着往下看。与threading
模块中创建多线程的方式一样,multiprocessing
模块创建多进程的方式也有两种,所以我们将之前的示例拿过来直接改一改就好。
import multiprocessing
import time
print("主进程任务开始处理")
def task(th_name):
print("子进程任务开始处理,参数:{0}".format(th_name))
time.sleep(3)
print("子进程任务处理完毕")
if __name__ == ''__main__'': # <--- Windows平台下必须在该条语句下执行
# ==== 实例化出Process类并添加子进程任务以及参数 ====
p1 = multiprocessing.Process(target=task, args=("进程[1]",)) # <-- 参数必须添加逗号。因为是args所以会打散,如果不加逗号则不能进行打散会抛出异常
p1.start() # 等待CPU调度..请注意这里不是立即执行
print("主进程任务处理完毕")
# ==== 执行结果 ====
"""
主进程任务开始处理
主进程任务处理完毕
主进程任务开始处理
子进程任务开始处理,参数:进程[1]
子进程任务处理完毕
"""
这里我们看执行结果,主进程任务开始处理
打印了两次,而主进程任务处理完毕
打印了一次,这是为什么呢?由于我们是在Windows
平台下,所以它默认的进程启动方式为spawn
,即创建了一个新的解释器进程并开始执行,所以上面的主进程任务开始处理
就打印了两次,一次是主进程,一次是新创建的子进程。而下面由于if __name__ == ''__main__'':
这条语句,子进程并不会执行该语句下面的代码块,所以主进程任务处理完毕
就只打印了一次。
自定义类继承 Process 并覆写 run 方法
import multiprocessing
import time
print("主进程任务开始处理")
class Processing(multiprocessing.Process):
"""自定义类"""
def __init__(self, th_name):
self.th_name = th_name
super(Processing, self).__init__()
def run(self):
print("子进程任务开始处理,参数:{0}".format(self.th_name))
time.sleep(3)
print("子进程任务处理完毕")
if __name__ == ''__main__'':
p1 = Processing("进程[1]")
p1.start() # 等待CPU调度..请注意这里不是立即执行
print("主进程任务处理完毕")
# ==== 执行结果 ====
"""
主进程任务开始处理
主进程任务处理完毕
主进程任务开始处理
子进程任务开始处理,参数:进程[1]
子进程任务处理完毕
"""
multiprocessing 方法大全
multiprocessing
模块中的方法参考了thrading
模块中的方法。但是我们一般用下面两个方法就够了,他们都可以拿到具体的进程对象。
multiprocessing 模块方法大全 | |
---|---|
方法 / 属性名称 | 功能描述 |
multiprocessing.active_children() | 查看当前进程存活了的所有子进程对象,以列表形式返回。 |
multiprocessing.current_process() | 获取当前进程对象。 |
进程对象方法大全
进程对象方法大全(即 Process 类的实例对象) | |
---|---|
方法 / 属性名称 | 功能描述 |
start() | 启动进程,该方法不会立即执行,而是告诉 CPU 自己准备好了,可以随时调度,而非立即启动。 |
run() | 一般是自定义类继承Process 类并覆写的方法,即线程的详细任务逻辑。 |
join(timeout=None) | 主进程默认会等待子进程运行结束后再继续执行,timeout 为等待的秒数,如不设置该参数则一直等待。 |
name | 可以通过 = 给该进程设置一个通俗的名字。如直接使用该属性则返回该进程的默认名字。 |
is_alive() | 查看进程是否存活,返回布尔值。 |
daemon | 可以通过 = 给该进程设置一个守护进程。如直接使用该属性则是查看进程是否为一个守护进程,返回布尔值。默认为False 。 |
pid | 返回进程 ID。在生成该进程之前,这将是 None 。 |
exitcode | 子进程的退出代码。如果进程尚未终止,这将是 None 。负值 -N 表示子进程被信号 N 终止。 |
authkey | 进程的身份验证密钥(字节字符串)。 |
sentinel | 系统对象的数字句柄,当进程结束时将变为 "ready" 。 |
terminate() | 终止进程。 |
kill() | 同上 |
close() | 关闭 Process 对象,释放与之关联的所有资源。如果底层进程仍在运行,则会引发 ValueError 。一旦 close() 成功返回, Process 对象的大多数其他方法和属性将引发 ValueError 。 |
注意 start() 、 join() 、 is_alive() 、 terminate() 和 exitcode 方法只能由创建进程对象的进程调用。 |
进程对象的好伙伴(即 Process 类的实例对象) | |
---|---|
os.getpid() | 返回进程 ID。 |
与 threading 模块的接口异同
守护进程 daemon
import multiprocessing
import time
print("主进程任务开始处理")
def task(th_name):
print("子进程任务开始处理,参数:{0}".format(th_name))
time.sleep(3)
print("子进程任务处理完毕")
if __name__ == ''__main__'':
p1 = multiprocessing.Process(target=task, args=("进程[1]",))
p1.daemon = True # <-- 设置进程对象p1为守护进程,注意这一步一定要放在start之前。
p1.start() # 等待CPU调度..请注意这里不是立即执行
time.sleep(2)
print("主进程任务处理完毕")
# ==== 执行结果 ==== # print("子进程任务处理完毕") 可以看到该句没有执行
"""
主进程任务开始处理
主进程任务开始处理
子进程任务开始处理,参数:进程[1]
主进程任务处理完毕
"""
设置与获取进程名
import multiprocessing
import time
print("主进程任务开始处理")
def task(th_name):
print("子进程任务开始处理,参数:{0}".format(th_name))
obj = multiprocessing.current_process() # 获取当前进程对象
print("获取当前的进程名:{0}".format(obj.name))
print("开始设置进程名")
obj.name = "yyy"
print("获取修改后的进程名:{0}".format(obj.name))
time.sleep(3)
print("子进程任务处理完毕")
if __name__ == ''__main__'':
# ==== 第一步:实例化出Process类并添加子进程任务以及参数 ====
t1 = multiprocessing.Process(target=task, args=("进程[1]",),name="xxx")
t1.start() # 等待CPU调度..请注意这里不是立即执行
print("主进程名:",multiprocessing.current_process().name) # 直接使用属性 name
print("主进程任务处理完毕")
# ==== 执行结果 ====
"""
主进程任务开始处理
主进程名: MainProcess
主进程任务处理完毕
主进程任务开始处理
子进程任务开始处理,参数:进程[1]
获取当前的进程名:xxx
开始设置进程名
获取修改后的进程名:yyy
子进程任务处理完毕
"""
锁相关演示
锁的使用和threading
模块中锁的使用相同,所以我们举例一个Lock
锁即可。
import multiprocessing
lock = multiprocessing.Lock() # 实例化同步锁对象 # 注意!!! 在Windows平台下,我们应该将锁的实例化放在上面,这样子进程才能拿到锁对象。否则就会抛出异常!!!或者也可以将锁对象传入当做形参进行传入,二者选其一
num = 0
def add():
lock.acquire() # 上锁
global num
for i in range(10000000): # 一千万次
num += 1
lock.release() # 解锁
def sub():
lock.acquire() # 上锁
global num
for i in range(10000000): # 一千万次
num -= 1
lock.release() # 解锁
if __name__ == ''__main__'':
t1 = multiprocessing.Process(target=add, )
t2 = multiprocessing.Process(target=sub, )
t1.start()
t2.start()
t1.join()
t2.join()
print("最终结果:", num)
# ==== 执行结果 ==== 三次采集
"""
最终结果: 0
最终结果: 0
最终结果: 0
"""


from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print(''hello world'', i)
finally:
l.release()
if __name__ == ''__main__'':
lock = Lock() # 将锁实例化后传入
for num in range(10):
Process(target=f, args=(lock, num)).start()
三种进程数据共享的方式
multiprocessing.Queue
这里一定要使用multiprocessing
中的Queue
,如果你想用队列中的task_done()
与join()
方法,你应该导入JoinableQueue
这个队列。
multiprocessing.Queue 方法大全 | |
---|---|
方法名称 | 功能描述 |
Queue.qsize() | 返回当前队列的大小 |
Queue.empty() | 判断当前队列是否为空 |
Queue.full() | 判断当前队列是否已满 |
Queue.put(item, block=True, timeout=None) | 将item 放入队列中,block 参数为如果要操作的队列目前已满是否阻塞,timeout 为超时时间。 |
Queue.put_nowait(item) | 相当于 put(item, False) ,如果操作的队列已满则不进行阻塞,而是抛出Full 异常。 |
Queue.get(block=True, timeout=None) | 将项目从队列中取出,block 参数为如果要操作的队列目前为空是否阻塞,timeout 为超时时间。 |
Queue.get_nowait() | 相当于 get(False) ,如果要操作的队列为空则不进行阻塞,而是抛出Empty 异常。 |
Queue.close() | 指示当前进程将不会再往队列中放入对象。一旦所有缓冲区中的数据被写入管道之后,后台的线程会退出。这个方法在队列被gc 回收时会自动调用。 |
Queue.join_thread() | 等待后台线程。这个方法仅在调用了 close() 方法之后可用。这会阻塞当前进程,直到后台线程退出,确保所有缓冲区中的数据都被写入管道中。 |
Queue.cancel_join_thread() | 防止 join_thread() 方法阻塞当前进程。具体而言,这防止进程退出时自动等待后台线程退出。详见 join_thread() 。 |
进程队列multiprocessing.Queue
不同于线程队列queue.Queue
,进程队列的消耗和底层实现比线程队列的要复杂许多。还是因为各进程之间不能共享任何数据,所以只能通过映射的方式来传递数据。进程队列multiprocessing.Queue
作为数据安全类型的数据结构,放在多进程中做通信使用是非常合适的,但是同时它的消耗也是非常大的,能不使用则尽量不要使用。


import time
import multiprocessing
from multiprocessing import Queue,JoinableQueue
def task_1(q):
print("正在装东西..")
time.sleep(3)
q.put("玫瑰花") # 正在装东西
q.task_done() # 通知对方可以取了
def task_2(q):
q.join() # 阻塞等待通知,接到通知说明队列里里有东西了。
print("取到了",q.get()) # 取东西
if __name__ == ''__main__'':
q = JoinableQueue(maxsize=5) # 实例化队列
t1 = multiprocessing.Process(target=task_1,args=(q,),name="小明") # 将队列传进子进程任务中
t2 = multiprocessing.Process(target=task_2,args=(q,),name="小花")
t1.start()
t2.start()
# ==== 执行结果 ====
"""
正在装东西..
取到了 玫瑰花
"""
什么线程队列queue.Queue
不能做到进程间数据共享呢,这是因为进程队列multiprocessing.Queue
会采取一种映射的方式来同步数据,所以说进程队列的资源消耗比线程队列要庞大很多。线程中所有信息共享,所以线程队列根本不需要映射关系。进程队列只是告诉你可以这样使用它达到进程间的数据共享,但是并不推荐你滥用它。
multiprocessing.Pipe
除开使用进程队列来实现进程间的通信,multiprocessing
还提供了Pipe
管道来进行通信。他的资源消耗较少并且使用便捷,但是唯一的缺点便是只支持点对点。
Pipe
有点类似socket
通信。但是比socket
通信更加简单,它不需要去做字符串处理字节,先来看一个实例:


import multiprocessing
from multiprocessing import Pipe
def task_1(conn1):
conn1.send("hello,我是task1")
print(conn1.recv())
def task_2(conn2):
print(conn2.recv())
conn2.send("我收到了,我是task2")
if __name__ == ''__main__'':
conn1,conn2 = Pipe() # 创建两个电话
p1 = multiprocessing.Process(target=task_1,args=(conn1,)) # 一人一部电话
p2 = multiprocessing.Process(target=task_2,args=(conn2,))
p1.start()
p2.start()
p1.join()
p2.join()
# ==== 执行结果 ====
"""
hello,我是task1
我收到了,我是task2
"""
multiprocessing.Mangaer
除了进程队列multiprocessing.Queue
,管道Pipe
,multiprocessing
还提供了Manager
作为共享变量来提供使用,但是这种方式是不应该被直接使用的因为它本身相较于进程队列Queue
是数据不安全的。当多个进程同时修改一个共享变量势必导致结果出现问题,所以要想使用共享变量还得使用multiprocessin
提供的进程锁才行。
Manager
类是数据不安全的;
Mangaer
类支持的类型非常多,如:value
, Array
, List
, Dict
, Queue(进程池通信专用)
, Lock
等。
Mangaer
实现了上下文管理器,可使用with
语句创建多个对象。具体使用方法我们来看一下:


import multiprocessing
from multiprocessing import Manager
def task_1(dic):
dic["task_1"] = "大帅哥"
def task_2(dic):
dic["task_2"] = "大美女"
print(dic.get("task_1"))
if __name__ == ''__main__'':
with Manager() as m: # !!!!! 注意 !!!!!!! 如果对 Manager()中的数据类型进行频繁的操作,而进程又特别多的时候,请使用 Rlock 锁进行处理,这有可能引发线程不安全!!!
dic = m.dict() # 实例化出了一个字典,除此之外还有很多其他的数据类型
p1 = multiprocessing.Process(target=task_1,args=(dic,)) # 将字典传进来
p2 = multiprocessing.Process(target=task_2,args=(dic,))
p1.start() # 启动一定要放在with之后
p2.start()
p1.join()
p2.join()
# ==== 执行结果 ====
"""
大帅哥
"""


import multiprocessing
from multiprocessing import Manager
def task_1(dic):
for i in range(1000):
dic["count"] += 1
def task_2(dic):
for i in range(1000):
dic["count"] -= 1
if __name__ == ''__main__'':
with Manager() as m: # !!!!! 注意 !!!!!!! 如果对 Manager()中的数据类型进行频繁的操作,而进程又特别多的时候,请使用 Rlock 锁进行处理,这有可能引发线程不安全!!!
dic = m.dict({"count":0}) # 实例化出了一个字典,除此之外还有很多其他的数据类型
p1 = multiprocessing.Process(target=task_1,args=(dic,)) # 传字典
p2 = multiprocessing.Process(target=task_2,args=(dic,))
p1.start()
p2.start()
p1.join()
p2.join()
print(dic)
# ==== 执行结果 ====
"""
{''count'': -23}
"""


import multiprocessing
from multiprocessing import Manager
from multiprocessing import RLock
def task_1(dic,lock):
with lock:
for i in range(1000):
dic["count"] += 1
def task_2(dic,lock):
with lock:
for i in range(1000):
dic["count"] -= 1
if __name__ == ''__main__'':
lock = RLock() # 实例化锁
with Manager() as m: # !!!!! 注意 !!!!!!! 如果对 Manager()中的数据类型进行频繁的操作,而进程又特别多的时候,请使用 Rlock 锁进行处理,这有可能引发线程不安全!!!
dic = m.dict({"count":0}) # 实例化出了一个字典,除此之外还有很多其他的数据类型
p1 = multiprocessing.Process(target=task_1,args=(dic,lock,)) # 传字典,传锁
p2 = multiprocessing.Process(target=task_2,args=(dic,lock,))
p1.start()
p2.start()
p1.join()
p2.join()
print(dic)
# ==== 执行结果 ====
"""
{''count'': 0}
"""
我们今天的关于使用multiprocessing.Process并发进程数最多和并发进程中用于实现进程互斥的程序段的分享就到这里,谢谢您的阅读,如果想了解更多关于(并发编程)进程 (multiprocessing--Process实现进程并发)、112 Python程序中的进程操作-开启多进程(multiprocess.process)、day 32 multiprocessing 模块中 Process 方法,僵尸进程,孤儿进程,守护进程、multiprocessing 多进程模块的相关信息,可以在本站进行搜索。
本文标签: