python多进程

基本概念

Python中的多进程是通过multiprocessing包来实现的,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象

这个进程对象的方法和线程对象的方法差不多也有start(),run(),join()等方法,其中有一个方法不同Thread线程对象中的守护线程方法是setDeamon,而Process进程对象的守护进程是通过设置daemon属性来完成的

与多线程的共享式内存不同,由于各个进程都是相互独立的,因此进程间通信再多进程中扮演这非常重要的角色,Python中我们可以使用multiprocessing模块中的pipequeueArrayValue等等工具来实现进程间通讯和数据共享,但是在编写起来仍然具有很大的不灵活性

img

任务类型

同步与异步

  • 同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息
    那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去
  • 异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态
    当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率

IO密集和计算密集

  • 对于IO密集型任务: python的多线程能够节省时间
  • 对于计算(CPU)密集型任务: Python的多线程并没有用处,建议使用多进程

其他组合搭配

python使用多核,即开多个进程

  • 方法一: 协程+多进程,使用方法简单,效率还可以,一般使用该方法

    协程yield是你自己写的,是自己定义什么时候切换进程

  • 方法二:IO多路复用,使用复杂,但效率很高,不常用

多进程相关模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 创建管理进程模块:
Process(用于创建进程):通过创建一个Process对象然后调用它的start()方法来生成进程。Process遵循threading.Thread的API。
Pool(用于创建进程管理池):可以创建一个进程池,该进程将执行与Pool该类一起提交给它的任务,当子进程较多需要管理时使用。
Queue(用于进程通信,资源共享):进程间通信,保证进程安全。
Value,Array(用于进程通信,资源共享):
Pipe(用于管道通信):管道操作。
Manager(用于资源共享):创建进程间共享的数据,包括在不同机器上运行的进程之间的网络共享。

# 同步子进程模块:
Condition
Event:用来实现进程间同步通信。
Lock:当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
RLock
Semaphore:用来控制对共享资源的访问数量,例如池的最大连接数。

python多线程低效原因

GIL的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定

某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个

拿不到通行证的线程,就不允许进入 CPU 执行

目前 Python 的解释器有多种,例如:

  • CPython:CPython 是用C语言实现的 Python 解释器,作为官方实现,它是最广泛使用的 Python 解释器
  • PyPy:PyPy 是用RPython实现的解释器。RPython 是 Python 的子集, 具有静态类型。这个解释器的特点是即时编译,支持多重后端(C, CLI, JVM)。PyPy 旨在提高性能,同时保持最大兼容性(参考 CPython 的实现)
  • Jython:Jython 是一个将 Python 代码编译成 Java 字节码的实现,运行在JVM(Java Virtual Machine)上。另外,它可以像是用 Python 模块一样,导入 并使用任何Java类
  • IronPython:IronPython 是一个针对 .NET 框架的 Python 实现。它 可以用 Python 和 .NET framewor k的库,也能将 Python 代码暴露给 .NET 框架中的其他语言

GIL 只在 CPython 中才有,而在 PyPy 和 Jython 中是没有 GIL 的

注意: 每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源

这就导致打印线程执行时长,会发现耗时更长的原因

并且由于 GIL 锁存在,Python 里一个进程永远只能同时执行一个线程(拿到 GIL 的线程才能执行),这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因

多进程实现方式

Process

普通Process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import  Process

def func(name):
print('测试%s多进程' %name)

if __name__ == '__main__':
process_list = []
for i in range(5): #开启5个子进程执行fun1函数
p = Process(target=func, args=('Python',)) #实例化进程对象
p.start()
process_list.append(p)

for i in process_list:
p.join()

print('结束测试')
1
2
3
4
5
6
7
8
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
结束测试

Process finished with exit code 0

上面的代码开启了5个子进程去执行函数,我们可以观察结果,是同时打印的,这里实现了真正的并行操作,就是多个CPU同时执行任务。

我们知道进程是python中最小的资源分配单元,也就是进程中间的数据,内存是不共享的,每启动一个进程,都要独立分配资源和拷贝访问的数据,所以进程的启动和销毁的代价是比较大了,所以在实际中使用多进程,要根据服务器的配置来设定。

继承Process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import  Process

class MyProcess(Process): #继承Process类
def __init__(self,name):
super(MyProcess,self).__init__()
self.name = name

def run(self):
print('测试%s多进程' % self.name)


if __name__ == '__main__':
process_list = []
for i in range(5): #开启5个子进程执行fun1函数
p = MyProcess('Python') #实例化进程对象
p.start()
process_list.append(p)

for i in process_list:
p.join()

print('结束测试')
1
2
3
4
5
6
7
8
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
结束测试

Process finished with exit code 0

通过类继承的方法来实现的,python多进程的第二种实现方式也是一样的,效果和第一种方式一样

Process类的其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])
  group: 线程组
  target: 要执行的方法
  name: 进程名
  args/kwargs: 要传入方法的参数

实例方法:
  is_alive():返回进程是否在运行,bool类型。
  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
  start():进程准备就绪,等待CPU调度
  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
  terminate():不管任务是否完成,立即停止工作进程

属性:
  daemon:和线程的setDeamon功能一样
  name:进程名字
  pid:进程号

进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# apply_async:异步
from multiprocessing import Pool,cpu_count
import os, time, random

def fun1(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
return f'{name}: {os.getpid()}'

if __name__=='__main__':
results = []
pool = Pool(cpu_count()-1)

for i in range(4):
results.append(pool.apply_async(func=fun1, args=(i,)))

pool.close()
pool.join()
print()
for result in results:
print(result.get())
print('All Done!!!')

print('结束测试')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Run task 0 (30716)...
Run task 1 (15020)...
Run task 2 (23200)...
Run task 3 (5884)...
Task 0 runs 1.34 seconds.
Task 2 runs 1.53 seconds.
Task 1 runs 1.88 seconds.
Task 3 runs 2.48 seconds.

0: 30716
1: 15020
2: 23200
3: 5884
All Done!!!
结束测试

Process finished with exit code 0

对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# map_async:异步
from multiprocessing import Pool, cpu_count, Manager
from functools import partial

def job(data, mgrDicTask, lock):
res = f'a+b = {data[0] + data[1]}'
lock.acquire()
# Manager对象无法监测到它引用的可变对象值的修改,需要通过触发__setitem__方法来让它获得通知
tempDic = list(mgrDicTask['result'])
tempDic.append(res)
mgrDicTask['result'] = tempDic
lock.release()
return res

if __name__ == "__main__":
data = [[2, 3], [3, 4], [2, 5]]
pool = Pool(processes=cpu_count() - 1)

mgr = Manager()
lock = mgr.Lock()
mgrDicTask = mgr.dict()
mgrDicTask['result'] = []

fun = partial(job, mgrDicTask=mgrDicTask, lock=lock)
pool.map_async(fun, data)
pool.close()
pool.join()

print(mgrDicTask['result'])
print('All Done!!!')
1
2
3
4
['a+b = 7', 'a+b = 7', 'a+b = 5']
All Done!!!

Process finished with exit code 0

其他进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 进程池的另外一种创建方式,跟线程池的创建方式一样。其方法等也相同。
def process_pool_test(url_list):
book_list = []
# 创建进程池
pool = ProcessPoolExecutor(max_workers=20)
start = time.time()
for url in url_list:
time.sleep(0.5)
result = pool.submit(get_book_info, url)
book_list.append(result)

pool.shutdown()
print('time: ', time.time() - start)

book_name_list = []
author_list = []
author_info_list = []
print('book_list: ', len(book_list))
for future in book_list:
book_name_list.extend(future.result()['name'])
author_list.extend(future.result()['author'])
author_info_list.extend(future.result()['info'])
ExcelUtils.write_data_to_excel('bookInfo', book_name_list, author_list, author_info_list)


if __name__ == '__main__':
sys.setrecursionlimit(10000)
url_list = ['https://www.edge.org/library']
for i in range(1, 52):
url_list.append('https://www.edge.org/library?page=%s' % i)

thread_pool_test(url_list)

多进程通信

内容提取神器 beautiful Soup 的用法

进程是系统独立调度核分配系统资源(CPU、内存)的基本单位,进程之间是相互独立的,每启动一个新的进程相当于把数据进行了一次克隆,子进程里的数据修改无法影响到主进程中的数据,不同子进程之间的数据也不能共享,这是多进程在使用中与多线程最明显的区别

但是难道Python多进程中间难道就是孤立的吗?

当然不是,python也提供了多种方法实现了多进程中间的通信和数据共享(可以修改一份数据)

进程队列Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/env Python
# -- coding: utf-8 --

"""
@version: v1.0
@author: narutohyc
@file: multiprocessing_queue.py
@Description: 多进程队列使用示例
@time: 2020/5/14 15:53
"""

from multiprocessing import Process, Queue, Manager
from multiprocessing import cpu_count
import os
import time


class Task:
def __init__(self, task_name: str, data: list, **kwargs):
self.task_name = task_name
self.data = data

def __repr__(self):
return f'task_name:{self.task_name} data:{self.data}'


class MultiProcessingQueue:
def __init__(self):
# 进程数
self.num_of_worker = cpu_count()

# 进程队列大小,根据不同的任务需求
self.size_of_queue = 10

def start_work(self):
print("start_work 开始")

# 进程队列
process_list = []

# 新建一个大小为10的队列
work_queue = Queue(self.size_of_queue)

# 进程间共享列表, 其他的还有共享字典等,都是进程安全的
dealed_sample_lst = Manager().list()

# 一个生产者
sent = Process(target=self.productor, args=(work_queue, dealed_sample_lst,))
sent.start()
process_list.append(sent)

# 多个消费者
for _ in range(self.num_of_worker - 1):
process = Process(target=self.consumer, args=(work_queue, dealed_sample_lst,))
process.start()
process_list.append(process)
[process.join() for process in process_list]
print("start_work 结束")
return dealed_sample_lst

def productor(self, work_queue: Queue, dealed_sample_lst):
print("生产者开始工作")
for ii in range(100):
work_queue.put(Task(task_name=f'{str(os.getpid())}-{str(ii)}', data=[ii for _ in range(2)]))
if ii % 30 == 0:
time.sleep(1)
print("生产者休息ing")

'''
JoinableQueue 比Queue多了task_done() 与join()两个函数,多用于生产者消费者问题。
task_done()是用在get()后,发送通知说我get完了
join()是说Queue里所有的task都已处理。
'''
# 这里需要加入结束标识,还有就是JoinableQueue的方式
for _ in range(self.num_of_worker - 1):
work_queue.put(None)
print("生产者工作结束")

def consumer(self, work_queue: Queue, dealed_sample_lst):
while True:
task: Task = work_queue.get()
if task is None:
break

# 处理数据
task.data = [ii * 2 for ii in task.data]
dealed_sample_lst.append(task)
print(task)

print(f'进程{os.getpid()} 处理结束')


def multiprocessing_queue_test():
multiprocessing_queue = MultiProcessingQueue()
dealed_sample_lst = multiprocessing_queue.start_work()
# for sample in dealed_sample_lst:
# print(sample)
print("测试结束")


if __name__ == '__main__':
multiprocessing_queue_test()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
start_work 开始
生产者开始工作
task_name:28868-0 data:[0, 0]
生产者休息ing
task_name:28868-1 data:[2, 2]
...
task_name:28868-6 data:[12, 12]
生产者休息ing
task_name:28868-31 data:[62, 62]
...
task_name:28868-58 data:[116, 116]
生产者休息ing
task_name:28868-61 data:[122, 122]
...
task_name:28868-64 data:[128, 128]
...
生产者休息ing
task_name:28868-91 data:[182, 182]
...
task_name:28868-96 data:[192, 192]
生产者工作结束
进程29208 处理结束
task_name:28868-97 data:[194, 194]
进程20632 处理结束
进程28496 处理结束
task_name:28868-98 data:[196, 196]
进程30200 处理结束
进程26512 处理结束
进程29776 处理结束
task_name:28868-99 data:[198, 198]
进程30072 处理结束
start_work 结束
测试结束

上面的代码结果可以看到我们主进程中可以通过Queue获取子进程中put的数据,实现进程间的通信

JoinableQueue队列

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理

通知进程是使用共享的信号和条件变量来实现的

  • 参数介绍:
    • maxsize: 是队列中允许最大项数,省略则无大小限制
  • 方法介绍:
    • q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理
      如果调用此方法的次数大于从队列中删除项目的数量将引发ValueError异常
    • q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理
      阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env Python
# -- coding: utf-8 --

"""
@version: v1.0
@author: narutohyc
@file: multiprocessing_queue.py
@Description: 多进程队列使用示例
@time: 2020/5/14 15:53
"""

from multiprocessing import Process, Queue, JoinableQueue, Manager
from multiprocessing import cpu_count
import os, time


class Task:
def __init__(self, task_name: str, data: list, **kwargs):
self.task_name = task_name
self.data = data

def __repr__(self):
return f'task_name:{self.task_name} data:{self.data}'


class MultiProcessingJoinableQueue:
def __init__(self):
# 进程数
self.num_of_worker = cpu_count()

# 进程队列大小,根据不同的任务需求
self.size_of_queue = 10

def start_work(self):
print("start_work 开始")

# 进程队列
process_list = []

# 新建一个大小为10的队列
work_queue = JoinableQueue(self.size_of_queue)

# 进程间共享列表, 其他的还有共享字典等,都是进程安全的
dealed_sample_lst = Manager().list()

# 一个生产者
sent = Process(target=self.productor, args=(work_queue, dealed_sample_lst,))
process_list.append(sent)

# 多个消费者
for _ in range(self.num_of_worker - 1):
process = Process(target=self.consumer, args=(work_queue, dealed_sample_lst,))
process.daemon = True
process_list.append(process)

[process.start() for process in process_list]
# 这里需要注意的一点是,这里join只需要调用生产者(别调消费者的join,否则无法正常退出)
# 消费者不需要,个人感觉应该是生产者那边已经调用了work_queue.join()的方法
# 消费者结束后,整个程序退出
[process.join() for process in process_list[:1]]

print("start_work 结束")
return dealed_sample_lst

def productor(self, work_queue: Queue, dealed_sample_lst):
print("生产者开始工作")
for ii in range(100):
work_queue.put(Task(task_name=f'{str(os.getpid())}-{str(ii)}', data=[ii for _ in range(2)]))
if ii % 30 == 0:
time.sleep(1)
print("生产者休息ing")

print("生产者工作结束")
work_queue.join()

def consumer(self, work_queue: Queue, dealed_sample_lst):
while True:
task: Task = work_queue.get()
if task is None:
break
# 处理数据
task.data = [ii * 2 for ii in task.data]
dealed_sample_lst.append(task)
print(task)
work_queue.task_done()
print(f'进程{os.getpid()} 处理结束')


def multiprocessing_joinablequeue_test():
multiprocessing_joinablequeue = MultiProcessingJoinableQueue()
dealed_sample_lst = multiprocessing_joinablequeue.start_work()
# for sample in dealed_sample_lst:
# print(sample)
print("测试结束")

if __name__ == '__main__':
multiprocessing_joinablequeue_test()

结果输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
start_work 开始
生产者开始工作
task_name:14608-0 data:[0, 0]
生产者休息ing
task_name:14608-1 data:[2, 2]
...
task_name:14608-7 data:[14, 14]
生产者休息ing
task_name:14608-31 data:[62, 62]
...
task_name:14608-60 data:[120, 120]
生产者休息ing
task_name:14608-61 data:[122, 122
...
task_name:14608-90 data:[180, 180]
生产者休息ing
生产者工作结束
task_name:14608-91 data:[182, 182]
...
task_name:14608-93 data:[186, 186]
start_work 结束
测试结束

管道Pipe

Pipe的本质是进程之间的用管道数据传递,而不是数据共享,这和socket有点像

pipe()返回两个连接对象分别表示管道的两端,每端都有send()和recv()函数

如果两个进程试图在同一时间的同一端进行读取和写入那么,这可能会损坏管道中的数据

管道是数据不安全的,多个进程同时收发数据可道引起数据异常,这时候就应该配合锁使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process, Pipe

def fun1(conn):
print('子进程发送消息:')
conn.send('你好主进程')
print('子进程接受消息:')
print(conn.recv())
conn.close()

if __name__ == '__main__':
conn1, conn2 = Pipe() #关键点,pipe实例化生成一个双向管
p = Process(target=fun1, args=(conn2,)) #conn2传给子进程
p.start()
print('主进程接受消息:')
print(conn1.recv())
print('主进程发送消息:')
conn1.send("你好子进程")
p.join()
print('结束测试')
1
2
3
4
5
6
7
8
9
主进程接受消息:
子进程发送消息:
子进程接受消息:
你好主进程
主进程发送消息:
你好子进程
结束测试

Process finished with exit code 0

上面可以看到主进程和子进程可以相互发送消息

Managers

Queue和Pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据,那么就要用到Managers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing import Process, Manager

def fun1(dic,lis,index):
dic[index] = 'a'
dic['2'] = 'b'
lis.append(index) #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
#print(l)

if __name__ == '__main__':
with Manager() as manager:
dic = manager.dict()#注意字典的声明方式,不能直接通过{}来定义
l = manager.list(range(5))#[0,1,2,3,4]

process_list = []
for i in range(10):
p = Process(target=fun1, args=(dic,l,i))
p.start()
process_list.append(p)

for res in process_list:
res.join()
print(dic)
print(l)
1
2
{0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'}
[0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]

可以看到主进程定义了一个字典和一个列表,在子进程中,可以添加和修改字典的内容
在列表中插入新的数据,实现进程间的数据共享,即可以共同修改同一份数据

注意事项

无法调用多层生成器(待验证)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env Python
# -- coding: utf-8 --

"""
@version: v0.1
@author: narutohyc
@file: text.py
@Description:
@time: 2020/5/29 19:51
"""

from multiprocessing import Process, Queue, JoinableQueue, Manager
from multiprocessing import cpu_count
import os, time

from abc import (ABC,
abstractmethod,
ABCMeta)

from comm.logger.logger_config import logger


class SampleIterator(ABC, metaclass=ABCMeta):
def __init__(self):
pass

@abstractmethod
def __iter__(self):
'''
样本处理并返回
'''
pass


class DataSource1(SampleIterator):
def __init__(self):
super(DataSource1, self).__init__()

def __iter__(self):
for ii in range(10):
yield ii


class DataSource2(SampleIterator):
def __init__(self, data_source):
super(DataSource2, self).__init__()
self.data_source = data_source

def __iter__(self):
for ii in self.data_source:
yield ii
class DataSource3(SampleIterator):
def __init__(self, data_source):
super(DataSource3, self).__init__()
self.data_source = data_source

def __iter__(self):
for ii in self.data_source:
yield ii
class DataSource4(SampleIterator):
def __init__(self, data_source):
super(DataSource4, self).__init__()
self.data_source = data_source

def __iter__(self):
for ii in self.data_source:
yield ii


class HUCY():
def __init__(self, data_source=None):
self.num_of_worker = cpu_count()
self.size_of_queue = 2
self.data_source = data_source

def start_work(self):
# 进程队列
process_list = []
# 新建一个大小为10的队列
work_queue = Queue(self.size_of_queue)

# 一个生产者
produce_num = 1

for _ in range(produce_num):
sent = Process(target=self.productor, args=(work_queue,))
sent.start()
process_list.append(sent)

# 多个消费者
for _ in range(self.num_of_worker - produce_num):
process = Process(target=self.consumer, args=(work_queue,))
process.start()
process_list.append(process)

# 这里需要加入结束标识,还有就是JoinableQueue的方式
[process.join() for process in process_list[:produce_num]]
for _ in range(self.num_of_worker - produce_num):
work_queue.put(None)
[process.join() for process in process_list[produce_num:]]
print("start_work 结束")

def productor(self, work_queue):
[work_queue.put(ii) for ii in self.data_source]
logger.info("生产者结束")

def consumer(self, work_queue):
while True:
data = work_queue.get()
if data is None:
break
logger.info(f"数据: {data}")
logger.info("消费者结束")


def hyc_test():
da1=DataSource2(DataSource3(DataSource4(DataSource1())))
da2 = DataSource2(DataSource3(DataSource4(da1)))
da3 = DataSource2(DataSource3(DataSource4(da2)))
da4 = DataSource2(DataSource3(DataSource4(da3)))
da5 = DataSource2(DataSource3(DataSource4(da4)))
da6 = DataSource2(DataSource3(DataSource4(DataSource2(da5))))
hucy = HUCY(da6)
hucy.start_work()


if __name__ == '__main__':
hyc_test()

几个问题:

  • 以上代码 若有多个生产者 就会各自拥有自己的数据生成器,导致数据重复

  • 有些定义的方法 好像会使程序卡住