Python多进程/多线程并发任务

通过 multi_process、multi_thread、single_thread 把多进程、多线程、单线程执行任务封装成统一调用格式,方便开发调试。

重构时可以先将原有的循环代码改造成调用single_thread,本质上应该和原来没有区别,然后将single_thread替换成multi_thread/multi_process即可切换到多线程或多进程。

concurrent_task.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# -*- coding: utf-8 -*-
import multiprocessing
import queue
import threading


def call_helper(func, common_data, respective_data):
args = []
kw = {}
if common_data:
assert(isinstance(common_data, list) or isinstance(common_data, tuple) or isinstance(common_data, dict))
if isinstance(common_data, list) or isinstance(common_data, tuple):
args += common_data
elif isinstance(common_data, dict):
kw.update(common_data)
if respective_data:
assert(isinstance(respective_data, list) or isinstance(respective_data, tuple) or isinstance(respective_data, dict))
if isinstance(respective_data, list) or isinstance(respective_data, tuple):
args += respective_data
else:
kw.update(respective_data)
return func(*args, **kw)


def _func(func, common_data, queue_in, queue_out):
while True:
try:
(i, respective_data) = queue_in.get(False)
r = call_helper(func, common_data, respective_data)
if r is not None:
queue_out.put((i, r))
except queue.Empty as e:
break

"""
用多进程执行func函数
Args:
process_num: 并发的进程数量
func: 回调函数,任务的函数体,需要接收通过common_data和respective_data_list传递的参数
common_data: 所有func共用的数据,应为list/tuple/dict类型之一或None,为dict类型时作为具名参数传递
respective_data_list: 每个func各自的数据,应为list/tuple/dict类型的list,为dict类型的list时作为具名参数传递

Returns:
list: func返回值的列表,长度等于len(respective_data_list),顺序和respective_data_list中的参数顺序保持一致
"""
def multi_process(process_num, func, common_data, respective_data_list):
manager = multiprocessing.Manager()
queue_in = manager.Queue()
for i, e in enumerate(respective_data_list):
queue_in.put((i,e))
queue_out = manager.Queue()
processes = []
for i in range(0, process_num):
proc = multiprocessing.Process(target=_func, args=(func, common_data, queue_in, queue_out))
processes.append(proc)
for proc in processes:
proc.start()
for proc in processes:
proc.join()
if not queue_out.empty():
ret = [None] * len(respective_data_list)
while True:
try:
(i, ret_value) = queue_out.get(False)
ret[i] = ret_value
except queue.Empty as e:
break
return ret

"""
用多线程执行func函数
Args:
thread_num: 并发的线程数量
func: 回调函数,任务的函数体,需要接收通过common_data和respective_data_list传递的参数
common_data: 所有func共用的数据,应为list/tuple/dict类型之一或None,为dict类型时作为具名参数传递
respective_data_list: 每个func各自的数据,应为list/tuple/dict类型的list,为dict类型的list时作为具名参数传递

Returns:
list: func返回值的列表,长度等于len(respective_data_list),顺序和respective_data_list中的参数顺序保持一致
"""
def multi_thread(thread_num, func, common_data, respective_data_list):
queue_in = queue.Queue()
for i, e in enumerate(respective_data_list):
queue_in.put((i,e))
queue_out = queue.Queue()
threads = []
for i in range(0, thread_num):
thread = threading.Thread(target=_func, args=(func, common_data, queue_in, queue_out))
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if not queue_out.empty():
ret = [None] * len(respective_data_list)
while True:
try:
(i, ret_value) = queue_out.get(False)
ret[i] = ret_value
except queue.Empty as e:
break
return ret


"""
用单线程执行func函数
Args:
_dummy_num: 无用的参数,仅为了和其他同类函数统一调用格式
func: 回调函数,任务的函数体,需要接收通过common_data和respective_data_list传递的参数
common_data: 所有func共用的数据,应为list/tuple/dict类型之一或None,为dict类型时作为具名参数传递
respective_data_list: 每个func各自的数据,应为list/tuple/dict类型的list,为dict类型的list时作为具名参数传递

Returns:
list: func返回值的列表,长度等于len(respective_data_list),顺序和respective_data_list中的参数顺序保持一致
"""
def single_thread(_dummy_num, func, common_data, respective_data_list):
ret = []
for i, respective_data in enumerate(respective_data_list):
r = call_helper(func, common_data, respective_data)
if r is not None:
if not ret:
ret = [None] * len(respective_data_list)
ret[i] = r
return ret