[toc]
python基础二十五 并发编程-多进程
1.多任务处理
多任务处理就是使计算机同时处理多个任务
1.1 实现方式
- 多进程
- 多线程
1.2 串行、并发、并行示意图
串行
并发
并行
2.多进程
2.1 多进程涉及的概念
程序:是一个指令的集合,例如编写完的代码,还没有运行
进程:正在执行的程序,例如当运行一个程序的时候,就启动了一个进程
2.2 python多进程
1.程序开始运行时,首先会创建一个主进程
2.在主进程(父进程)下,可以创建新的进程(子进程),子进程依赖于主进程,如果主进程结束,程序会退出
3.python提供了非常好用的进程包multiprocessing,借助这个包,可以轻松完成从单进程到并发执行的转换
2.2.1 multiprocessing模块、类方法创建多进程
方法一:multiprocessing模块提供了一个Process类来创建一个进程对象
//代码示例1
from multiprocessing import Process
def run(name):
print(f"子进程 '{name}' 运行中")
if __name__ == "__main__": #windowns中防止递归执行创建子进程导致内存不足,Linux、Mac中可以不写
print("父进程启动")
p = Process(target=run,args=('我是传入的参数',)) #创建子进程
p.start() #启动进程
print(p.name) #打印进程名字,可以自定义
p.join() #告知主进程等待子进程结束
print("子进程结束")
结果:
父进程启动
Process-1
子进程 '我是传入的参数' 运行中
子进程结束
//代码示例2 创建多个子进程、自定义进程名字
from multiprocessing import Process
def run1(name,sex):
print(f"子进程 '{name}' 运行中,我是{sex}的")
def run2(name,sex):
print(f"子进程 '{name}' 运行中,我是{sex}的")
if __name__ == "__main__":
print("父进程启动")
p1 = Process(target=run1,args=('我是子进程1','男',),name='自定义子进程1')
p2 = Process(target=run2,args=('我是子进程2','女',),name='自定义子进程2')
p1.start()
p2.start()
print(p1.name)
print(p2.name)
p1.join()
p2.join()
print("子进程结束")
结果:
父进程启动
自定义子进程1
自定义子进程2
子进程 '我是子进程1' 运行中,我是男的
子进程 '我是子进程2' 运行中,我是女的
子进程结束
方法二:类方法创建多进程
创建新的进程还可以使用类的方式,可以自定义一个类,继承Process类,每次实例化这个类的时候,就等同于实例化一个进程对象
//multiprocessing模块创建多进程方法
from multiprocessing import Process
def run(name):
print(f"我是进程:'{name}'")
if __name__ == "__main__":
p = Process(target=run,args=('呵呵',))
p.start()
p.join()
print("子进程结束")
结果:
我是进程:'呵呵'
子进程结束
//基于以上代码,使用类的方式创建多进程
from multiprocessing import Process
class Custum(Process):
def __init__(self,name):
super().__init__()
self.name = name
def run(self): #重写run方法,只能有一个且必须叫run
print(f"我是进程:'{self.name}'")
if __name__ == "__main__":
p = Custum("类创建进程")
p.start()
p.join()
print("子进程结束")
结果:
我是进程:'类创建进程'
子进程结束
使用类方法创建多进程
from multiprocessing import Process
class A(Process):
def __init__(self,name):
super().__init__()
self.name = name
def hehe(self): #名称必须叫run,否则运行结果会有问题
print(f"子进程 {self.name} 运行中")
if __name__ == "__main__":
print("父进程启动")
p = A("类创建进程")
p.start()
p.join()
print("子进程结束")
结果:
父进程启动
子进程结束
2.2.2 __name == "__main__"
参数
1.一个python的文件有两种使用的方法,第一是直接作为程序执行,第二是import到其他的python程序 中被调用(模块重用)执行。
2.因此
if __name__ == 'main':
的作用就是控制这两种情况执行代码的过程,__name__
是内置变量,用于表示当前模块的名字3.在
if __name__ == 'main':
下的代码只有在文件作为程序直接执行才会被执行,而import到其他程序中是不会被执行的4.在
Windows 上,子进程会自动 import 启动它的这个文件
,而在 import 的时候是会执行这些语句的。 如果不加if __name__ == "__main__":
的话就会无限递归创建子进程 所以必须把创建子进程的部分用那个 if 判断保护起来 import 的时候__name__
不是__main__
,就不会递归运行了
//错误示例
from multiprocessing import Process
def run(name):
print(f"我是进程: '{name}'")
p = Process(target=run,args=('呵呵',))
p.start()
p.join()
print("子进程结束")
结果:
windows会报一堆错,Linux、Mac没有问题
//正确示例
from multiprocessing import Process
def run(name):
print(f"我是进程: '{name}'")
if __name__ == "__main__":
p = Process(target=run,args=('呵呵',))
p.start()
p.join()
print("子进程结束")
我是进程: '呵呵'
子进程结束
2.2.3 多进程参数
-
target
表示调用对象,即子进程要执行的任务
p = Process(target=对象名(函数名))
-
args
表示调用对象的位置参数元组,args=(传入的参数,)
⚠️括号中传入的参数后面必须加逗号
p = Process(target=xxx,args=('传入的参数',))
-
name
表示进程的名称
p = Process(target=xxx,args=('传入的参数',),name='子进程名称')
2.2.4 Process类常用方法
-
p.start()
启动进程,并调用该子进程中的p.run()
-
p.run()
进程启动时运行的方法,正是它去调用
target指定的函数
,我们自定义类中的一定要实现该方法 -
p.terminate()
强制终止进程p,不会进行任何清理操作
-
p.is_alive()
如果子进程p仍然运行,返回True,用来判断进程是否还在运行
-
p.join([超时时间])
主进程等待p终止,timeout是可选的超时时间
2.2.5 Process类常用属性
-
name
当前进程实例别名,默认为Process-N,N为从1开始递增的整数,可以指定进程名称
-
pid
当前进程实例的PID
2.2.6 多进程中的全局变量
全局变量在多个进程中不共享,进程之间的数据是独立的,默认情况下互不影响
from multiprocessing import Process
num = 10
def f1():
global num
num += 1
print(f"第一个子进程中的num值为:{num}")
def f2():
global num
num += 2
print(f"第二个子进程中的num值为:{num}")
if __name__ == "__main__":
p1 = Process(target=f1)
p2 = Process(target=f2)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"全局变量中的num值为:{num}")
结果:
第一个子进程中的num值为:11
第二个子进程中的num值为:12
全局变量中的num值为:10
2.3 进程池
进程池:用来创建多个进程
当需要创建多子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果是大量的进程目标,手动创建进程的工作量巨大,此时就可以利用multiprocessing模块提供的
Pool
初始化
Pool
时,可以指定一个最大进程数
,当有新的请求提交到Pool
中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求,但是如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行
from multiprocessing import Pool
import time
def r1():
print("123")
time.sleep(5)
def r2():
print("abc")
time.sleep(3)
if __name__ == "__main__":
po = Pool(5) #定义一个进程池,最大进程数为5,不写默认为CPU核心数
for i in range(5):
po.apply_async(r1) #apply_async选择要调用的目标,每次循环会用空出来的子进程去调用目标
po.apply_async(r2)
po.close() #进程关闭之后不再接受新的请求
po.join() #等待po中所有子进程结束,必须放在close后面
结果:
第一秒会出现以下结果,但是后续不确定
因为进程池中定义了最大进程数为5
123
abc
123
abc
123
2.3.1 进程池常用函数解析
multiprocessing.Pool常用函数解析
-
apply_async(func[,args[,kwds]]):
使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能进行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表
-
apply(func[,args[,kwds]])
使用阻塞方式调用func
-
close()
关闭Pool,使其不再接受新的任务
-
join()
主进程阻塞,等待子进程的退出,必须在close或terminate之后使用
2.4 进程间通信
2.4.1 队列Q实现进程间数据传递
多进程之间,默认是不共享数据的
通过Queue(队列Q)可以实现进程间数据传递
Q本身是一个消息队列
Queue方法说明
-
Queue.put([num])
存入消息,num不写或者为负数不限制
-
Queue.qsize()
返回当前队列包含的消息数量
-
Queue.empty()
如果队列为空,返回True,反之返回False
-
Queue.full()
如果队列满了,返回True,反之返回False
-
Queue.get([block[,timeout]])
获取队列中的一条消息,然后将其从队列移除,block默认值为True
- 如果block使用默认值,且没有设置timeout(单位秒),消息队列如果为空,此时程序将被阻塞(停在读取状态),直到从消息队列读到消息为止
- 如果设置了timeout,则会等待timeout秒,若还没有读取到任何消息,则抛出"Queue.Empty"异常
- 如果block值为False,消息队列如果为空,则会立刻抛出"Queue.Empty"异常
2.4.1.1 存入消息
Queue.put([num]) #存入消息,num不写或者为负数不限制
//存入消息,最多存入3条,此时运行程序不回报错
from multiprocessing import Queue
q = Queue(3)
q.put("存入消息1")
q.put("存入消息2")
q.put("存入消息3")
//存入消息,最多存入3条,如果此时存入4条,程序会卡住,知道能够存入为止
from multiprocessing import Queue
q = Queue(3)
q.put("存入消息1")
q.put("存入消息2")
q.put("存入消息3")
q.put("存入消息4") #这一条消息不会存入到队列中,知道队列有空闲可以存入为止
2.4.1.2 读取消息
Queue.get([block[,timeout]]) #获取队列中的一条消息,然后将其从队列移除,block默认值为True
//存入消息
from multiprocessing import Queue
q = Queue(3) #初始化一个Queue对象,最多可接受3条消息
q.put("存入消息1") #添加消息,数据类型不限
q.put("存入消息2")
q.put("存入消息3")
//读取消息,默认方式读取
from multiprocessing import Queue
q = Queue(3) #初始化一个Queue对象,最多可接受3条消息
q.put("存入消息1") #添加消息,数据类型不限
q.put("存入消息2")
q.put("存入消息3")
print(q.get())
print(q.get())
print(q.get())
结果:
存入消息1
存入消息2
//读取消息,指定block值为False
如果block值为False,消息队列如果为空,则会立刻抛出"Queue.Empty"异常
from multiprocessing import Queue
q = Queue(3)
print(q.get(block=False))
结果:
queue.Empty
⚠️此方法有时会报错队列为空,有时就没有问题,win和mac中一样,linux会始终报错队列为空
from multiprocessing import Queue
q = Queue(3)
q.put("存入消息1")
q.put("存入消息2")
q.put("存入消息3")
print(q.get(block=False))
//读取消息,指定读取空队列超时时间
from multiprocessing import Queue
q = Queue(3)
print(q.get(timeout=3))
结果:
等待3秒后会报错
queue.Empty