python多进程 基本概念
Python中的多进程是通过multiprocessing
包来实现的,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象
这个进程对象的方法和线程对象的方法差不多也有start(),run(),join()等方法,其中有一个方法不同Thread线程对象中的守护线程方法是setDeamon,而Process进程对象的守护进程是通过设置daemon属性来完成的
与多线程的共享式内存不同,由于各个进程都是相互独立的,因此进程间通信再多进程中扮演这非常重要的角色,Python中我们可以使用multiprocessing模块中的pipe
、queue
、Array
、Value
等等工具来实现进程间通讯和数据共享,但是在编写起来仍然具有很大的不灵活性
任务类型
同步与异步
同步 就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息 那么这个进程将会一直等待 下去,直到收到返回信息才继续执行下去
异步 是指进程不需要一直等 下去,而是继续执行下面的操作,不管其他进程的状态 当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率
IO密集和计算密集
对于IO密集型任务 : python的多线程能够节省时间
对于计算(CPU)密集型任务 : Python的多线程并没有用处,建议使用多进程
其他组合搭配
python使用多核,即开多个进程
方法一: 协程+多进程,使用方法简单,效率还可以,一般使用该方法
协程yield是你自己写的,是自己定义什么时候切换进程
方法二:IO多路复用,使用复杂,但效率很高,不常用
多进程相关模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Process(用于创建进程):通过创建一个Process对象然后调用它的start()方法来生成进程。Process遵循threading.Thread的API。 Pool(用于创建进程管理池):可以创建一个进程池,该进程将执行与Pool该类一起提交给它的任务,当子进程较多需要管理时使用。 Queue(用于进程通信,资源共享):进程间通信,保证进程安全。 Value,Array(用于进程通信,资源共享): Pipe(用于管道通信):管道操作。 Manager(用于资源共享):创建进程间共享的数据,包括在不同机器上运行的进程之间的网络共享。 Condition Event:用来实现进程间同步通信。 Lock:当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。 RLock Semaphore:用来控制对共享资源的访问数量,例如池的最大连接数。
python多线程低效原因
GIL 的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定
某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个
拿不到通行证的线程,就不允许进入 CPU 执行
目前 Python 的解释器有多种,例如:
CPython :CPython 是用C语言实现的 Python 解释器,作为官方实现,它是最广泛使用的 Python 解释器
PyPy :PyPy 是用RPython实现的解释器。RPython 是 Python 的子集, 具有静态类型。这个解释器的特点是即时编译,支持多重后端(C, CLI, JVM)。PyPy 旨在提高性能,同时保持最大兼容性(参考 CPython 的实现)
Jython :Jython 是一个将 Python 代码编译成 Java 字节码的实现,运行在JVM(Java Virtual Machine)上。另外,它可以像是用 Python 模块一样,导入 并使用任何Java类
IronPython :IronPython 是一个针对 .NET 框架的 Python 实现。它 可以用 Python 和 .NET framewor k的库,也能将 Python 代码暴露给 .NET 框架中的其他语言
GIL 只在 CPython 中才有,而在 PyPy 和 Jython 中是没有 GIL 的
注意: 每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源
这就导致打印线程执行时长,会发现耗时更长的原因
并且由于 GIL 锁存在,Python 里一个进程永远只能同时执行一个线程(拿到 GIL 的线程才能执行),这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因
多进程实现方式 Process
普通Process
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from multiprocessing import Processdef func (name ): print ('测试%s多进程' %name) if __name__ == '__main__' : process_list = [] for i in range (5 ): p = Process(target=func, args=('Python' ,)) p.start() process_list.append(p) for i in process_list: p.join() print ('结束测试' )
1 2 3 4 5 6 7 8 测试Python多进程 测试Python多进程 测试Python多进程 测试Python多进程 测试Python多进程 结束测试 Process finished with exit code 0
上面的代码开启了5个子进程去执行函数,我们可以观察结果,是同时打印的,这里实现了真正的并行操作,就是多个CPU同时执行任务。
我们知道进程是python中最小的资源分配单元 ,也就是进程中间的数据,内存是不共享的 ,每启动一个进程,都要独立分配资源和拷贝访问的数据,所以进程的启动和销毁的代价是比较大 了,所以在实际中使用多进程,要根据服务器的配置来设定。
继承Process
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from multiprocessing import Processclass MyProcess (Process ): def __init__ (self,name ): super (MyProcess,self).__init__() self.name = name def run (self ): print ('测试%s多进程' % self.name) if __name__ == '__main__' : process_list = [] for i in range (5 ): p = MyProcess('Python' ) p.start() process_list.append(p) for i in process_list: p.join() print ('结束测试' )
1 2 3 4 5 6 7 8 测试Python多进程 测试Python多进程 测试Python多进程 测试Python多进程 测试Python多进程 结束测试 Process finished with exit code 0
通过类继承的方法来实现的,python多进程的第二种实现方式也是一样的,效果和第一种方式一样
Process类的其他方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 构造方法: Process ([group [, target [, name [, args [, kwargs ]]]]]) group : 线程组 target: 要执行的方法 name: 进程名 args/kwargs: 要传入方法的参数 实例方法: is_alive():返回进程是否在运行,bool类型。 join([timeout ]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。 start ():进程准备就绪,等待CPU调度 run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。 terminate():不管任务是否完成,立即停止工作进程 属性: daemon:和线程的setDeamon功能一样 name:进程名字 pid:进程号
进程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from multiprocessing import Pool,cpu_countimport os, time, randomdef fun1 (name ): print ('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3 ) end = time.time() print ('Task %s runs %0.2f seconds.' % (name, (end - start))) return f'{name} : {os.getpid()} ' if __name__=='__main__' : results = [] pool = Pool(cpu_count()-1 ) for i in range (4 ): results.append(pool.apply_async(func=fun1, args=(i,))) pool.close() pool.join() print () for result in results: print (result.get()) print ('All Done!!!' ) print ('结束测试' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Run task 0 (30716 )... Run task 1 (15020 )... Run task 2 (23200 )... Run task 3 (5884 )... Task 0 runs 1.34 seconds. Task 2 runs 1.53 seconds. Task 1 runs 1.88 seconds. Task 3 runs 2.48 seconds. 0 : 30716 1 : 15020 2 : 23200 3 : 5884 All Done!!! 结束测试 Process finished with exit code 0
对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from multiprocessing import Pool, cpu_count, Managerfrom functools import partialdef job (data, mgrDicTask, lock ): res = f'a+b = {data[0 ] + data[1 ]} ' lock.acquire() tempDic = list (mgrDicTask['result' ]) tempDic.append(res) mgrDicTask['result' ] = tempDic lock.release() return res if __name__ == "__main__" : data = [[2 , 3 ], [3 , 4 ], [2 , 5 ]] pool = Pool(processes=cpu_count() - 1 ) mgr = Manager() lock = mgr.Lock() mgrDicTask = mgr.dict () mgrDicTask['result' ] = [] fun = partial(job, mgrDicTask=mgrDicTask, lock=lock) pool.map_async(fun, data) pool.close() pool.join() print (mgrDicTask['result' ]) print ('All Done!!!' )
1 2 3 4 ['a+b = 7' , 'a+b = 7' , 'a+b = 5' ] All Done!!! Process finished with exit code 0
其他进程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def process_pool_test (url_list ): book_list = [] pool = ProcessPoolExecutor(max_workers=20 ) start = time.time() for url in url_list: time.sleep(0.5 ) result = pool.submit(get_book_info, url) book_list.append(result) pool.shutdown() print ('time: ' , time.time() - start) book_name_list = [] author_list = [] author_info_list = [] print ('book_list: ' , len (book_list)) for future in book_list: book_name_list.extend(future.result()['name' ]) author_list.extend(future.result()['author' ]) author_info_list.extend(future.result()['info' ]) ExcelUtils.write_data_to_excel('bookInfo' , book_name_list, author_list, author_info_list) if __name__ == '__main__' : sys.setrecursionlimit(10000 ) url_list = ['https://www.edge.org/library' ] for i in range (1 , 52 ): url_list.append('https://www.edge.org/library?page=%s' % i) thread_pool_test(url_list)
多进程通信
内容提取神器 beautiful Soup 的用法
进程是系统独立调度核分配系统资源(CPU、内存)的基本单位,进程之间是相互独立的 ,每启动一个新的进程相当于把数据进行了一次克隆,子进程里的数据修改无法影响到主进程中的数据,不同子进程之间的数据也不能共享,这是多进程在使用中与多线程最明显的区别
但是难道Python多进程中间难道就是孤立的吗?
当然不是,python也提供了多种方法实现了多进程中间的通信和数据共享(可以修改一份数据)
进程队列Queue 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 """ @version: v1.0 @author: narutohyc @file: multiprocessing_queue.py @Description: 多进程队列使用示例 @time: 2020/5/14 15:53 """ from multiprocessing import Process, Queue, Managerfrom multiprocessing import cpu_countimport osimport timeclass Task : def __init__ (self, task_name: str , data: list , **kwargs ): self.task_name = task_name self.data = data def __repr__ (self ): return f'task_name:{self.task_name} data:{self.data} ' class MultiProcessingQueue : def __init__ (self ): self.num_of_worker = cpu_count() self.size_of_queue = 10 def start_work (self ): print ("start_work 开始" ) process_list = [] work_queue = Queue(self.size_of_queue) dealed_sample_lst = Manager().list () sent = Process(target=self.productor, args=(work_queue, dealed_sample_lst,)) sent.start() process_list.append(sent) for _ in range (self.num_of_worker - 1 ): process = Process(target=self.consumer, args=(work_queue, dealed_sample_lst,)) process.start() process_list.append(process) [process.join() for process in process_list] print ("start_work 结束" ) return dealed_sample_lst def productor (self, work_queue: Queue, dealed_sample_lst ): print ("生产者开始工作" ) for ii in range (100 ): work_queue.put(Task(task_name=f'{str (os.getpid())} -{str (ii)} ' , data=[ii for _ in range (2 )])) if ii % 30 == 0 : time.sleep(1 ) print ("生产者休息ing" ) ''' JoinableQueue 比Queue多了task_done() 与join()两个函数,多用于生产者消费者问题。 task_done()是用在get()后,发送通知说我get完了 join()是说Queue里所有的task都已处理。 ''' for _ in range (self.num_of_worker - 1 ): work_queue.put(None ) print ("生产者工作结束" ) def consumer (self, work_queue: Queue, dealed_sample_lst ): while True : task: Task = work_queue.get() if task is None : break task.data = [ii * 2 for ii in task.data] dealed_sample_lst.append(task) print (task) print (f'进程{os.getpid()} 处理结束' ) def multiprocessing_queue_test (): multiprocessing_queue = MultiProcessingQueue() dealed_sample_lst = multiprocessing_queue.start_work() print ("测试结束" ) if __name__ == '__main__' : multiprocessing_queue_test()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 start_work 开始 生产者开始工作 task_name:28868 -0 data :[0 , 0 ] 生产者休息ing task_name:28868 -1 data :[2 , 2 ] ... task_name:28868 -6 data :[12 , 12 ] 生产者休息ing task_name:28868 -31 data :[62 , 62 ] ... task_name:28868 -58 data :[116 , 116 ] 生产者休息ing task_name:28868 -61 data :[122 , 122 ] ... task_name:28868 -64 data :[128 , 128 ] ... 生产者休息ing task_name:28868 -91 data :[182 , 182 ] ... task_name:28868 -96 data :[192 , 192 ] 生产者工作结束 进程29208 处理结束 task_name:28868 -97 data :[194 , 194 ] 进程20632 处理结束 进程28496 处理结束 task_name:28868 -98 data :[196 , 196 ] 进程30200 处理结束 进程26512 处理结束 进程29776 处理结束 task_name:28868 -99 data :[198 , 198 ] 进程30072 处理结束 start_work 结束 测试结束
上面的代码结果可以看到我们主进程中可以通过Queue获取子进程中put的数据,实现进程间的通信
JoinableQueue队列 JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理
通知进程是使用共享的信号和条件变量来实现的
参数介绍:
maxsize : 是队列中允许最大项数,省略则无大小限制
方法介绍:
q.task_done() :使用者使用此方法发出信号,表示q.get()的返回项目已经被处理 如果调用此方法的次数大于从队列中删除项目的数量将引发ValueError异常
q.join() :生产者调用此方法进行阻塞,直到队列中所有的项目均被处理 阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 """ @version: v1.0 @author: narutohyc @file: multiprocessing_queue.py @Description: 多进程队列使用示例 @time: 2020/5/14 15:53 """ from multiprocessing import Process, Queue, JoinableQueue, Managerfrom multiprocessing import cpu_countimport os, timeclass Task : def __init__ (self, task_name: str , data: list , **kwargs ): self.task_name = task_name self.data = data def __repr__ (self ): return f'task_name:{self.task_name} data:{self.data} ' class MultiProcessingJoinableQueue : def __init__ (self ): self.num_of_worker = cpu_count() self.size_of_queue = 10 def start_work (self ): print ("start_work 开始" ) process_list = [] work_queue = JoinableQueue(self.size_of_queue) dealed_sample_lst = Manager().list () sent = Process(target=self.productor, args=(work_queue, dealed_sample_lst,)) process_list.append(sent) for _ in range (self.num_of_worker - 1 ): process = Process(target=self.consumer, args=(work_queue, dealed_sample_lst,)) process.daemon = True process_list.append(process) [process.start() for process in process_list] [process.join() for process in process_list[:1 ]] print ("start_work 结束" ) return dealed_sample_lst def productor (self, work_queue: Queue, dealed_sample_lst ): print ("生产者开始工作" ) for ii in range (100 ): work_queue.put(Task(task_name=f'{str (os.getpid())} -{str (ii)} ' , data=[ii for _ in range (2 )])) if ii % 30 == 0 : time.sleep(1 ) print ("生产者休息ing" ) print ("生产者工作结束" ) work_queue.join() def consumer (self, work_queue: Queue, dealed_sample_lst ): while True : task: Task = work_queue.get() if task is None : break task.data = [ii * 2 for ii in task.data] dealed_sample_lst.append(task) print (task) work_queue.task_done() print (f'进程{os.getpid()} 处理结束' ) def multiprocessing_joinablequeue_test (): multiprocessing_joinablequeue = MultiProcessingJoinableQueue() dealed_sample_lst = multiprocessing_joinablequeue.start_work() print ("测试结束" ) if __name__ == '__main__' : multiprocessing_joinablequeue_test()
结果输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 start_work 开始 生产者开始工作 task_name:14608-0 data :[0, 0] 生产者休息ing task_name :14608-1 data :[2, 2]... task_name :14608-7 data :[14, 14]生产者休息ing task_name :14608-31 data :[62, 62]... task_name :14608-60 data :[120, 120]生产者休息ing task_name :14608-61 data :[122, 122... task_name :14608-90 data :[180, 180]生产者休息ing 生产者工作结束 task_name :14608-91 data :[182, 182]... task_name :14608-93 data :[186, 186]start_work 结束测试结束
管道Pipe Pipe的本质是进程之间的用管道数据传递,而不是数据共享,这和socket有点像
pipe()返回两个连接对象分别表示管道的两端,每端都有send()和recv()函数
如果两个进程试图在同一时间的同一端进行读取和写入那么,这可能会损坏管道中的数据
管道是数据不安全的,多个进程同时收发数据可道引起数据异常,这时候就应该配合锁使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from multiprocessing import Process, Pipedef fun1 (conn ): print ('子进程发送消息:' ) conn.send('你好主进程' ) print ('子进程接受消息:' ) print (conn.recv()) conn.close() if __name__ == '__main__' : conn1, conn2 = Pipe() p = Process(target=fun1, args=(conn2,)) p.start() print ('主进程接受消息:' ) print (conn1.recv()) print ('主进程发送消息:' ) conn1.send("你好子进程" ) p.join() print ('结束测试' )
1 2 3 4 5 6 7 8 9 主进程接受消息: 子进程发送消息: 子进程接受消息: 你好主进程 主进程发送消息: 你好子进程 结束测试 Process finished with exit code 0
上面可以看到主进程和子进程可以相互发送消息
Managers Queue和Pipe只是实现了数据交互,并没实现数据共享 ,即一个进程去更改另一个进程的数据,那么就要用到Managers
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from multiprocessing import Process, Managerdef fun1 (dic,lis,index ): dic[index] = 'a' dic['2' ] = 'b' lis.append(index) if __name__ == '__main__' : with Manager() as manager: dic = manager.dict () l = manager.list (range (5 )) process_list = [] for i in range (10 ): p = Process(target=fun1, args=(dic,l,i)) p.start() process_list.append(p) for res in process_list: res.join() print (dic) print (l)
1 2 {0 : 'a' , '2' : 'b' , 3 : 'a' , 1 : 'a' , 2 : 'a' , 4 : 'a' , 5 : 'a' , 7 : 'a' , 6 : 'a' , 8 : 'a' , 9 : 'a' } [0 , 1 , 2 , 3 , 4 , 0 , 3 , 1 , 2 , 4 , 5 , 7 , 6 , 8 , 9 ]
可以看到主进程定义了一个字典和一个列表,在子进程中,可以添加和修改字典的内容 在列表中插入新的数据,实现进程间的数据共享,即可以共同修改同一份数据
注意事项 无法调用多层生成器(待验证) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 """ @version: v0.1 @author: narutohyc @file: text.py @Description: @time: 2020/5/29 19:51 """ from multiprocessing import Process, Queue, JoinableQueue, Managerfrom multiprocessing import cpu_countimport os, timefrom abc import (ABC, abstractmethod, ABCMeta) from comm.logger.logger_config import loggerclass SampleIterator (ABC, metaclass=ABCMeta): def __init__ (self ): pass @abstractmethod def __iter__ (self ): ''' 样本处理并返回 ''' pass class DataSource1 (SampleIterator ): def __init__ (self ): super (DataSource1, self).__init__() def __iter__ (self ): for ii in range (10 ): yield ii class DataSource2 (SampleIterator ): def __init__ (self, data_source ): super (DataSource2, self).__init__() self.data_source = data_source def __iter__ (self ): for ii in self.data_source: yield ii class DataSource3 (SampleIterator ): def __init__ (self, data_source ): super (DataSource3, self).__init__() self.data_source = data_source def __iter__ (self ): for ii in self.data_source: yield ii class DataSource4 (SampleIterator ): def __init__ (self, data_source ): super (DataSource4, self).__init__() self.data_source = data_source def __iter__ (self ): for ii in self.data_source: yield ii class HUCY (): def __init__ (self, data_source=None ): self.num_of_worker = cpu_count() self.size_of_queue = 2 self.data_source = data_source def start_work (self ): process_list = [] work_queue = Queue(self.size_of_queue) produce_num = 1 for _ in range (produce_num): sent = Process(target=self.productor, args=(work_queue,)) sent.start() process_list.append(sent) for _ in range (self.num_of_worker - produce_num): process = Process(target=self.consumer, args=(work_queue,)) process.start() process_list.append(process) [process.join() for process in process_list[:produce_num]] for _ in range (self.num_of_worker - produce_num): work_queue.put(None ) [process.join() for process in process_list[produce_num:]] print ("start_work 结束" ) def productor (self, work_queue ): [work_queue.put(ii) for ii in self.data_source] logger.info("生产者结束" ) def consumer (self, work_queue ): while True : data = work_queue.get() if data is None : break logger.info(f"数据: {data} " ) logger.info("消费者结束" ) def hyc_test (): da1=DataSource2(DataSource3(DataSource4(DataSource1()))) da2 = DataSource2(DataSource3(DataSource4(da1))) da3 = DataSource2(DataSource3(DataSource4(da2))) da4 = DataSource2(DataSource3(DataSource4(da3))) da5 = DataSource2(DataSource3(DataSource4(da4))) da6 = DataSource2(DataSource3(DataSource4(DataSource2(da5)))) hucy = HUCY(da6) hucy.start_work() if __name__ == '__main__' : hyc_test()
几个问题: