0%

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# -*- coding: utf-8 -*-
import openpyxl
import xlrd
import os

class Sheet:
def cell_value(self, row, col):
pass


class XlsxSheet(Sheet):
def __init__(self, sheet):
self.sheet = sheet

def cell_value(self, row, col):
return self.sheet.cell(row=row+1, column=col+1).value


class XlsSheet(Sheet):
def __init__(self, sheet):
self.sheet = sheet

def cell_value(self, row, col):
return self.sheet.cell_value(row, col)


class _Workbook:
def sheet_by_index(self, index):
pass


class XlsxFile(_Workbook):
def __init__(self, path):
self.workbook = openpyxl.load_workbook(path)

def sheet_by_index(self, index):
sheet = self.workbook[self.workbook.sheetnames[index]]
return XlsxSheet(sheet)


class XlsFile(_Workbook):
def __init__(self, path):
self.workbook = xlrd.open_workbook(path)

def sheet_by_index(self, index):
sheet = self.workbook.sheet_by_index(index)
return XlsSheet(sheet)


class Workbook:
def __init__(self, path):
filedir, filename = os.path.split(path)
_, extname = os.path.splitext(filename)
if extname == ".xlsx":
self.impl = XlsxFile(path)
elif extname == ".xls":
self.impl = XlsFile(path)
else:
raise IOError("未知的文件类型")

def sheet_by_index(self, index):
return self.impl.sheet_by_index(index)

让Sublime Text使用WSL中的Python执行.py脚本

找到C:\Program Files\Sublime Text\Packages\Python.sublime-package

copy一份将增加后缀.zip用解压软件打开。

我们增加一个文件来定义新的构建方式,新建文件WSL Python.sublime-build,输入内容

1
2
3
4
5
6
{
"cmd":["python.bat","${file}"],
"file_regex": "^[ ]*File \"(...*?)\", line ([0-9]*)",
"path":"C:/Program Files/Sublime Text/helper",
"selector": "source.python",
}

大概意思是由C:/Program Files/Sublime Text/helper/python.bat来执行通过source.python选出的文件,选出的理解为所有*.py即可。

新建文件C:/Program Files/Sublime Text/helper/python.bat,输入内容

1
2
3
4
5
6
@echo off
set str=%1
set str=%str:\=/%
set str=%str:C:=/mnt/c%
set str=%str:D:=/mnt/d%
C:\windows\system32\wsl.exe python -u %str%

重启Sublime Text,构建*.py时选择WSL Python即可。

让Sublime Text使用WSL中的g++编译运行.cpp文件

找到C:\Program Files\Sublime Text\Packages\C++.sublime-package

copy一份将增加后缀.zip用解压软件打开。

我们增加一个文件来定义新的构建方式,新建文件WSL C++ Single File.sublime-build,输入内容

1
2
3
4
5
6
{
"cmd":["cpp.bat","${file}", "${file_path}", "${file_base_name}"],
"file_regex": "^[ ]*File \"(...*?)\", line ([0-9]*)",
"path":"C:/Program Files/Sublime Text/helper",
"selector": "source.c++",
}

大概意思是由C:/Program Files/Sublime Text/helper/cpp.bat来执行通过source.c++选出的文件,选出的理解为所有*.cpp即可。

新建文件C:/Program Files/Sublime Text/helper/cpp.bat,输入内容

1
2
3
4
5
6
7
8
9
10
11
12
@echo off
set file=%1
set file_path=%2
set file_base_name=%3
set file=%file:\=/%
set file=%file:C:=/mnt/c%
set file=%file:D:=/mnt/d%
set file_path=%file_path:\=/%
set file_path=%file_path:C:=/mnt/c%
set file_path=%file_path:D:=/mnt/d%

C:\windows\system32\wsl.exe g++ -std=c++17 %file% -o %file_path%/%file_base_name% && C:\windows\system32\wsl.exe %file_path%/%file_base_name%

重启Sublime Text,构建*.cpp时选择WSL C++ Single File即可。

下面的代码实现了3个有用的东西:

  • KillableThread 一个可kill的线程,并且可以通过join返回线程方法的数据,以及在join时把线程内的异常重新抛出到调用join的线程。
  • timeout装饰器 超时自动抛出TimeoutError异常。
  • retry装饰器 提供异常时自动重试,可以指定重试次数,0表示不重试,重试次数超过后会把最后一次的异常向外抛出。

func_utils.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: utf-8 -*-
import threading
import time
import inspect
import ctypes
import traceback
import sys, os
from functools import wraps

class KillableThread(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._return_value = None
self._exception = None

def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")

def kill(self):
KillableThread._async_raise(self.ident, SystemExit)

def run(self):
try:
self._return_value = self._target(*self._args, **self._kwargs)
except Exception as e:
self._exception = e

def join(self):
super().join()
if self._exception is not None:
raise self._exception
return self._return_value

def join(self, timeout):
super().join(timeout)
if self._exception is not None:
raise self._exception
return self._return_value


def _get_thread(tid):
for t in threading.enumerate():
if t.ident == tid:
return t
return None


def _get_frame_stack(tid):
for thread_id, stack in sys._current_frames().items():
if thread_id == tid:
return stack
return None


def _get_formated_frame_stack(tid):
info = []
th = _get_thread(tid)
stack = _get_frame_stack(tid)
info.append('%s thread_id=%d' % (th.name, tid))
for filename, lineno, _, line in traceback.extract_stack(stack):
info.append(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
return '\n'.join(info)


def timeout(seconds):
"""
Decorator to execute a function with a specified timeout.

Args:
- seconds (int): The time limit in seconds for the function to complete.

Returns:
- function: The decorated function.

Raises:
- TimeoutError: If the function does not complete within the specified time limit.

Usage:
@timeout(seconds=10)
def my_function():
# Function body
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
th = KillableThread(target=func, args=args, kwargs=kwargs)
th.daemon = True
th.start()
ret = th.join(seconds)
if th.is_alive():
formated_frame_stack = _get_formated_frame_stack(th.ident)
th.kill()
raise TimeoutError(f"{repr(func)} timeout. Frame stack:\n{formated_frame_stack}")
return ret
return wrapper
return decorator


def retry(retries=1, retry_interval=0):
"""
Decorator to retry a function a specified number of times with a given interval between retries.

Args:
- retries (int): The number of times the function should be retried if it raises an exception. If set to 1, the function will be attempted initially and retried once.
- retry_interval (int): The time interval in seconds to wait between retries.

Returns:
- function: The decorated function.

Raises:
- The original exception raised by the function if all retries fail.

Usage:
@retry(retries=2, retry_interval=2)
def my_function():
# Function body
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for i in range(retries+1):
try:
return func(*args, **kwargs)
except Exception as e:
if i < retries:
time.sleep(retry_interval)
else:
raise e
return wrapper
return decorator

对logging模块的封装,修正了标准库的TimedRotatingFileHandler在多进程时可能把前一日的日志误删除的问题。

通常使用只需要导入init_loggerlogger即可

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# -*- coding: utf-8 -*-
import logging, os, sys, time, re
from stat import ST_MTIME
from logging.handlers import BaseRotatingHandler, _MIDNIGHT
# logging.basicConfig(format='%(asctime)s %(levelname)s [%(filename)s->%(funcName)s:%(lineno)d]\t%(message)s',
# level=logging.DEBUG)


class TimedRotatingFileHandler(BaseRotatingHandler):
"""
Handler for logging to a file, rotating the log file at certain timed
intervals.

If backupCount is > 0, when rollover is done, no more than backupCount
files are kept - the oldest ones are deleted.
"""
def __init__(self, filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None):
BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay)
self.when = when.upper()
self.backupCount = backupCount
self.utc = utc
self.atTime = atTime
# Calculate the real rollover interval, which is just the number of
# seconds between rollovers. Also set the filename suffix used when
# a rollover occurs. Current 'when' events supported:
# S - Seconds
# M - Minutes
# H - Hours
# D - Days
# midnight - roll over at midnight
# W{0-6} - roll over on a certain day; 0 - Monday
#
# Case of the 'when' specifier is not important; lower or upper case
# will work.
if self.when == 'S':
self.interval = 1 # one second
self.suffix = "%Y-%m-%d_%H-%M-%S"
self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}(\.\w+)?$"
elif self.when == 'M':
self.interval = 60 # one minute
self.suffix = "%Y-%m-%d_%H-%M"
self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}(\.\w+)?$"
elif self.when == 'H':
self.interval = 60 * 60 # one hour
self.suffix = "%Y-%m-%d_%H"
self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}(\.\w+)?$"
elif self.when == 'D' or self.when == 'MIDNIGHT':
self.interval = 60 * 60 * 24 # one day
self.suffix = "%Y-%m-%d"
self.extMatch = r"^\d{4}-\d{2}-\d{2}(\.\w+)?$"
elif self.when.startswith('W'):
self.interval = 60 * 60 * 24 * 7 # one week
if len(self.when) != 2:
raise ValueError("You must specify a day for weekly rollover from 0 to 6 (0 is Monday): %s" % self.when)
if self.when[1] < '0' or self.when[1] > '6':
raise ValueError("Invalid day specified for weekly rollover: %s" % self.when)
self.dayOfWeek = int(self.when[1])
self.suffix = "%Y-%m-%d"
self.extMatch = r"^\d{4}-\d{2}-\d{2}(\.\w+)?$"
else:
raise ValueError("Invalid rollover interval specified: %s" % self.when)

self.extMatch = re.compile(self.extMatch, re.ASCII)
self.interval = self.interval * interval # multiply by units requested
# The following line added because the filename passed in could be a
# path object (see Issue #27493), but self.baseFilename will be a string
filename = self.baseFilename
if os.path.exists(filename):
t = os.stat(filename)[ST_MTIME]
else:
t = int(time.time())
self.rolloverAt = self.computeRollover(t)

def computeRollover(self, currentTime):
"""
Work out the rollover time based on the specified time.
"""
result = currentTime + self.interval
# If we are rolling over at midnight or weekly, then the interval is already known.
# What we need to figure out is WHEN the next interval is. In other words,
# if you are rolling over at midnight, then your base interval is 1 day,
# but you want to start that one day clock at midnight, not now. So, we
# have to fudge the rolloverAt value in order to trigger the first rollover
# at the right time. After that, the regular interval will take care of
# the rest. Note that this code doesn't care about leap seconds. :)
if self.when == 'MIDNIGHT' or self.when.startswith('W'):
# This could be done with less code, but I wanted it to be clear
if self.utc:
t = time.gmtime(currentTime)
else:
t = time.localtime(currentTime)
currentHour = t[3]
currentMinute = t[4]
currentSecond = t[5]
currentDay = t[6]
# r is the number of seconds left between now and the next rotation
if self.atTime is None:
rotate_ts = _MIDNIGHT
else:
rotate_ts = ((self.atTime.hour * 60 + self.atTime.minute)*60 +
self.atTime.second)

r = rotate_ts - ((currentHour * 60 + currentMinute) * 60 +
currentSecond)
if r < 0:
# Rotate time is before the current time (for example when
# self.rotateAt is 13:45 and it now 14:15), rotation is
# tomorrow.
r += _MIDNIGHT
currentDay = (currentDay + 1) % 7
result = currentTime + r
# If we are rolling over on a certain day, add in the number of days until
# the next rollover, but offset by 1 since we just calculated the time
# until the next day starts. There are three cases:
# Case 1) The day to rollover is today; in this case, do nothing
# Case 2) The day to rollover is further in the interval (i.e., today is
# day 2 (Wednesday) and rollover is on day 6 (Sunday). Days to
# next rollover is simply 6 - 2 - 1, or 3.
# Case 3) The day to rollover is behind us in the interval (i.e., today
# is day 5 (Saturday) and rollover is on day 3 (Thursday).
# Days to rollover is 6 - 5 + 3, or 4. In this case, it's the
# number of days left in the current week (1) plus the number
# of days in the next week until the rollover day (3).
# The calculations described in 2) and 3) above need to have a day added.
# This is because the above time calculation takes us to midnight on this
# day, i.e. the start of the next day.
if self.when.startswith('W'):
day = currentDay # 0 is Monday
if day != self.dayOfWeek:
if day < self.dayOfWeek:
daysToWait = self.dayOfWeek - day
else:
daysToWait = 6 - day + self.dayOfWeek + 1
newRolloverAt = result + (daysToWait * (60 * 60 * 24))
if not self.utc:
dstNow = t[-1]
dstAtRollover = time.localtime(newRolloverAt)[-1]
if dstNow != dstAtRollover:
if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour
addend = -3600
else: # DST bows out before next rollover, so we need to add an hour
addend = 3600
newRolloverAt += addend
result = newRolloverAt
return result

def shouldRollover(self, record):
"""
Determine if rollover should occur.

record is not used, as we are just comparing times, but it is needed so
the method signatures are the same
"""
t = int(time.time())
if t >= self.rolloverAt:
return 1
return 0

def getFilesToDelete(self):
"""
Determine the files to delete when rolling over.

More specific than the earlier method, which just used glob.glob().
"""
dirName, baseName = os.path.split(self.baseFilename)
fileNames = os.listdir(dirName)
result = []
prefix = baseName + "."
plen = len(prefix)
for fileName in fileNames:
if fileName[:plen] == prefix:
suffix = fileName[plen:]
if self.extMatch.match(suffix):
result.append(os.path.join(dirName, fileName))
if len(result) < self.backupCount:
result = []
else:
result.sort()
result = result[:len(result) - self.backupCount]
return result

def doRollover(self):
"""
do a rollover; in this case, a date/time stamp is appended to the filename
when the rollover happens. However, you want the file to be named for the
start of the interval, not the current time. If there is a backup count,
then we have to get a list of matching filenames, sort them and remove
the one with the oldest suffix.
"""
if self.stream:
self.stream.close()
self.stream = None
# get the time that this sequence started at and make it a TimeTuple
currentTime = int(time.time())
dstNow = time.localtime(currentTime)[-1]
t = self.rolloverAt - self.interval
if self.utc:
timeTuple = time.gmtime(t)
else:
timeTuple = time.localtime(t)
dstThen = timeTuple[-1]
if dstNow != dstThen:
if dstNow:
addend = 3600
else:
addend = -3600
timeTuple = time.localtime(t + addend)
dfn = self.rotation_filename(self.baseFilename + "." +
time.strftime(self.suffix, timeTuple))

# 修正多进程时日志前一天的日志可能在滚动时被误删
if os.path.exists(dfn):
ori_dfn = dfn
i = 1
while os.path.exists(dfn):
i += 1
dfn = f"{ori_dfn}.{i}"

self.rotate(self.baseFilename, dfn)
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
if not self.delay:
self.stream = self._open()
newRolloverAt = self.computeRollover(currentTime)
while newRolloverAt <= currentTime:
newRolloverAt = newRolloverAt + self.interval
#If DST changes and midnight or weekly rollover, adjust for this.
if (self.when == 'MIDNIGHT' or self.when.startswith('W')) and not self.utc:
dstAtRollover = time.localtime(newRolloverAt)[-1]
if dstNow != dstAtRollover:
if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour
addend = -3600
else: # DST bows out before next rollover, so we need to add an hour
addend = 3600
newRolloverAt += addend
self.rolloverAt = newRolloverAt

# name 用于支持同时输出多个日志文件,如果仅需要一个文件则可以忽略此参数,保持为'main'即可
# echo 表示需要回显到控制台
# filedir 自定义日志文件目录,默认为当前目录
# filename 自定义日志的文件名,为None或False时不输出到文件,为True时自动根据脚本名生成文件名
# level 定义日志输出的过滤等级,表示需要输出的最低等级,指定为INFO则DEBUG级别的日志不会输出
# fmt 配置每行日志的格式,None表示使用默认的格式,详见logging模块的文档
# backupCount 默认为7,日志按天切割保留最近7天的日志,如果指定为None表示不按天切割
def init_logger(name="main", echo=True, filedir=None, filename=None, level=None, fmt=None, backupCount=7):
fmt = fmt or '%(asctime)s %(levelname)s [%(filename)s->%(funcName)s:%(lineno)d]\t%(message)s'
formatter = logging.Formatter(fmt) #设置日志格式
logger = logging.getLogger(name)
if level:
if level == "DEBUG" or level == logging.DEBUG:
logger.setLevel(level = logging.DEBUG)
elif level == "INFO" or level == logging.INFO:
logger.setLevel(level = logging.INFO)
elif level == "WARN" or level == "WARNING" or level == logging.WARN:
logger.setLevel(level = logging.WARN)
elif level == "FATAL" or level == logging.FATAL:
logger.setLevel(level = logging.FATAL)
else:
logger.setLevel(level = level)
if not logger.hasHandlers():
if echo:
console_handler = logging.StreamHandler() # 控制台Handler
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.DEBUG)
# if logger.hasHandlers()
logger.addHandler(console_handler)
if filename:
if type(filename) == bool and filename:
filename = get_logfile_name()
if filedir is not None:
_, filename = os.path.split(filename)
filename = os.path.join(filedir, filename)
else:
if filedir is not None:
filename = os.path.join(filedir, filename)

filedir, _ = os.path.split(filename)
if filedir and not os.path.exists(filedir):
os.makedirs(filedir)

if backupCount:
file_handler = TimedRotatingFileHandler(filename=filename, when='MIDNIGHT', interval=1, backupCount=backupCount, encoding='utf-8')
else:
file_handler = logging.FileHandler(filename=filename, encoding='utf-8')
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
logger.info(f"logger({name or ''}) initialized. [level] {level} [format] {fmt} [filename] {filename}")
return logger


def get_logger(name=None):
return logging.getLogger(name)


def get_logfile_name():
path, filename = os.path.split(sys.argv[0])
name, ext = os.path.splitext(filename)
return os.path.join(path, f"{name}.log")


logger = get_logger(name="main")

用法

1
2
3
4
5
6
7
8
9
10

from utils.logger import logger, init_logger

if __name__ == '__main__':
init_logger(level="INFO", echo=True, filename=True, filedir="logs")

logger.debug("test")
logger.info("test")
logger.warning("test")

新建或编辑~/.screenrc

增加

1
termcapinfo xterm|xterms|xs|rxvt ti@:te@

这样只对当前用户生效,如果要对所有人生效,则编辑vim /etc/screenrc

找到上面的语句并解开注释,或者新增上面的语句。

通过 multi_process、multi_thread、single_thread 把多进程、多线程、单线程执行任务封装成统一调用格式,方便开发调试。

重构时可以先将原有的循环代码改造成调用single_thread,本质上应该和原来没有区别,然后将single_thread替换成multi_thread/multi_process即可切换到多线程或多进程。

concurrent_task.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# -*- coding: utf-8 -*-
import multiprocessing
import queue
import threading


def call_helper(func, common_data, respective_data):
args = []
kw = {}
if common_data:
assert(isinstance(common_data, list) or isinstance(common_data, tuple) or isinstance(common_data, dict))
if isinstance(common_data, list) or isinstance(common_data, tuple):
args += common_data
elif isinstance(common_data, dict):
kw.update(common_data)
if respective_data:
assert(isinstance(respective_data, list) or isinstance(respective_data, tuple) or isinstance(respective_data, dict))
if isinstance(respective_data, list) or isinstance(respective_data, tuple):
args += respective_data
else:
kw.update(respective_data)
return func(*args, **kw)


def _func(func, common_data, queue_in, queue_out):
while True:
try:
(i, respective_data) = queue_in.get(False)
r = call_helper(func, common_data, respective_data)
if r is not None:
queue_out.put((i, r))
except queue.Empty as e:
break

"""
用多进程执行func函数
Args:
process_num: 并发的进程数量
func: 回调函数,任务的函数体,需要接收通过common_data和respective_data_list传递的参数
common_data: 所有func共用的数据,应为list/tuple/dict类型之一或None,为dict类型时作为具名参数传递
respective_data_list: 每个func各自的数据,应为list/tuple/dict类型的list,为dict类型的list时作为具名参数传递

Returns:
list: func返回值的列表,长度等于len(respective_data_list),顺序和respective_data_list中的参数顺序保持一致
"""
def multi_process(process_num, func, common_data, respective_data_list):
manager = multiprocessing.Manager()
queue_in = manager.Queue()
for i, e in enumerate(respective_data_list):
queue_in.put((i,e))
queue_out = manager.Queue()
processes = []
for i in range(0, process_num):
proc = multiprocessing.Process(target=_func, args=(func, common_data, queue_in, queue_out))
processes.append(proc)
for proc in processes:
proc.start()
for proc in processes:
proc.join()
if not queue_out.empty():
ret = [None] * len(respective_data_list)
while True:
try:
(i, ret_value) = queue_out.get(False)
ret[i] = ret_value
except queue.Empty as e:
break
return ret

"""
用多线程执行func函数
Args:
thread_num: 并发的线程数量
func: 回调函数,任务的函数体,需要接收通过common_data和respective_data_list传递的参数
common_data: 所有func共用的数据,应为list/tuple/dict类型之一或None,为dict类型时作为具名参数传递
respective_data_list: 每个func各自的数据,应为list/tuple/dict类型的list,为dict类型的list时作为具名参数传递

Returns:
list: func返回值的列表,长度等于len(respective_data_list),顺序和respective_data_list中的参数顺序保持一致
"""
def multi_thread(thread_num, func, common_data, respective_data_list):
queue_in = queue.Queue()
for i, e in enumerate(respective_data_list):
queue_in.put((i,e))
queue_out = queue.Queue()
threads = []
for i in range(0, thread_num):
thread = threading.Thread(target=_func, args=(func, common_data, queue_in, queue_out))
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if not queue_out.empty():
ret = [None] * len(respective_data_list)
while True:
try:
(i, ret_value) = queue_out.get(False)
ret[i] = ret_value
except queue.Empty as e:
break
return ret


"""
用单线程执行func函数
Args:
_dummy_num: 无用的参数,仅为了和其他同类函数统一调用格式
func: 回调函数,任务的函数体,需要接收通过common_data和respective_data_list传递的参数
common_data: 所有func共用的数据,应为list/tuple/dict类型之一或None,为dict类型时作为具名参数传递
respective_data_list: 每个func各自的数据,应为list/tuple/dict类型的list,为dict类型的list时作为具名参数传递

Returns:
list: func返回值的列表,长度等于len(respective_data_list),顺序和respective_data_list中的参数顺序保持一致
"""
def single_thread(_dummy_num, func, common_data, respective_data_list):
ret = []
for i, respective_data in enumerate(respective_data_list):
r = call_helper(func, common_data, respective_data)
if r is not None:
if not ret:
ret = [None] * len(respective_data_list)
ret[i] = r
return ret

作为Linux命令三剑客之一的awk功能非常强大,用法也较为复杂,最擅长是按列提取值,awk结合grep结合xargs可以完成非常多的事,网上教程很多,我就以解析一个csv文件为例演示awk最实用的一些功能。

test.csv内容

姓名 成绩
张三 95
李四
王五 98
平均分 96.5

文本内容

1
2
3
4
5
姓名,成绩
张三,95
李四,
王五,98
平均分,96.5

需求

打印除了表头(第一行)和汇总(最后一行)外所有行的第一列和最后一列,但如果最后一列是空的则这行不打印。
对于上例,打印出来应该是

1
2
张三 95
王五 98

处理语句

1
cat test.csv|awk 'NR>1'|awk 'NR!=1 {print prev} {prev=$0}'|awk -F, '$NF ~ /[^\t\r\n ]/ {print $1,$NF}'

其中用到了3次awk:
第一个awk,awk 'NR>1',实现去掉首行
第二个awk,awk 'NR!=1 {print prev} {prev=$0}',实现去掉最后一行
第三个awk,awk -F, '$NF ~ /[^\t\r\n ]/ {print $1,$NF}',指定了,为分隔符,当最后一列不空时打印第一列和最后一列

awk命令格式

awk

options

最长用的就是 -F 用来指定列分割符

pattern & action

类似if条件,可以用()括起来也可以不括,多个条件&&连接,多个条件||连接,只有满足条件才会做后面的动作,省略pattern相当于条件为真,即对每一行做action动作。action可以和{}一起省略,省略时执行默认的action输出整行。action可以是多条语句,语句间用;分割。
pattern {action}整体可以重复多次,表示多一行执行多组pattern {action}

在pattern和action中可以使用的特殊符号:

行号:NR
列数:NF
NF是一个数字,而$NF是最后一列的值。

file

即需要处理文件的文件名,通常使用awk处理来自管道的文本,所以file通常是省略的。

一个实用的例子

获取各个网口配置的IPv4

1
ifconfig|awk 'prev ~ /.+:/ && $1=="inet" {print prev,$2} {prev=$1}'

ifconfig的输出类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ens33: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
inet 192.168.1.242 netmask 255.255.255.0 broadcast 192.168.1.255
inet6 fe80::20c:29ff:fe7d:e0 prefixlen 64 scopeid 0x20<link>
ether 00:0c:29:7d:00:e0 txqueuelen 1000 (Ethernet)
RX packets 523230074 bytes 791300064486 (791.3 GB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 17037016 bytes 1268820578 (1.2 GB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0

lo: flags=73<UP,LOOPBACK,RUNNING> mtu 65536
inet 127.0.0.1 netmask 255.0.0.0
inet6 ::1 prefixlen 128 scopeid 0x10<host>
loop txqueuelen 1000 (Local Loopback)
RX packets 138 bytes 13090 (13.0 KB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 138 bytes 13090 (13.0 KB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0

处理后的输出为

1
2
ens33: 192.168.1.242
lo: 127.0.0.1

默认情况Linux添加用户需要等待New password:提示在输入密码,无法通过一句命令直接添加用户并设置密码,这导致在Shell脚本中添加用户或者批量添加变得困难,我们可以通过expect自制一个非交互式的添加用户并设置密码的命令。

expect

expect是一直可执行工具,通过执行expect脚本来把shell交互的行为提前设置在expect脚本中。

如果没有安装expect,首先安装expect

1
apt intall expect

expect脚本主要有spawn/expect/send三个命令 :

  • spawn后面跟着会产生交互行为的命令
  • expect后面是等待命令产生的交互提示
  • send后面是等到交互提示后输入的字符

非交互添加用户脚本

1
2
3
4
5
6
7
8
9
#!/usr/bin/expect
set username [lindex $argv 0]
set password [lindex $argv 1]
spawn adduser --gecos "" $username
expect "*password"
send "$password\n"
expect "*password"
send "$password\n"
expect eof

例如保存成 adduserwithpw.sh,执行./adduserwithpw.sh user1 pass就可以添加名为user1密码为pass的用户了。

官网 https://grpc.io/
中文文档 http://doc.oschina.net/grpc/

环境搭建

安装依赖

1
apt install -y build-essential autoconf libtool pkg-config cmake

Clone gRPC repo

1
git clone --recurse-submodules -b v1.56.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc

C++ 构建并安装gRPC和Protocol Buffers

1
2
3
4
5
6
7
8
9
10
11
12
13
export MY_INSTALL_DIR=$HOME/.local
mkdir -p $MY_INSTALL_DIR
export PATH="$MY_INSTALL_DIR/bin:$PATH"

mkdir -p cmake/build
pushd cmake/build
cmake -DgRPC_INSTALL=ON \
-DgRPC_BUILD_TESTS=OFF \
-DCMAKE_INSTALL_PREFIX=$MY_INSTALL_DIR \
../..
make -j 4
make install
popd

这里我们安装了gRPC的头文件和库文件,以及一个工具protoc

Python 安装gRPC和gRPC工具(包含 protoc

1
2
pip install grpcio
pip install grpcio-tools

定义接口和数据

.proto文件

.proto文件用于定义数据(请求参数message和返回值message)和接口(service)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
syntax = "proto3"; // 表示这个文件遵循的proto语法的版本

package calc; // 对C++来说表示生成代码的namespace,如果不指定则为全局

// 定义服务接口
service Calc {
rpc Add (AddRequest) returns (AddReply) {}
}

// 定义Add方法的请求参数
message AddRequest {
int32 a = 1;
int32 b = 2;
}

// 定义Add方法的返回数据
message AddReply {
int32 sum = 1;
}

从.proto文件生成代码

protoc工具用于将使用ProtoBuf定义的.proto文件编译成特定编程语言的源代码。
.proto文件描述了数据和接口的定义。
例如通过.proto生成C++代码的示例:

1
protoc --cpp_out=./output_directory ./protos/proto_file.proto

会在output_directory中生成一个protos文件夹,里面有.proto同名的*.pb.h*.pb.cc文件。
这里有一个细节,生成protos文件夹是因为指定的.proto文件带有路径,生成的路径和指定.proto的路径相同,如果想把.h和.cc文件直接生成在指定的文件夹中,可以通过-I参数指定.proto文件的路径而不在指定.proto文件时携带路径。即

1
protoc --cpp_out=./output_directory -I./protos proto_file.proto

如果使用cmake构建项目,通常将这个生成过程写在CMakeList.txt中,用add_custom_command命令完成。
这里生成的.h和.cc中只包含了对数据(.proto中的message)的封装,而不包含接口(.proto中的service)的黏合剂,要生成service的代码需要利用grpc_cpp_plugin,可以通过在protoc命令中增加参数一并生成接口和数据的代码。

1
protoc --cpp_out=./output_directory --grpc_out=./output_directory --plugin=protoc-gen-grpc=/bin/grpc_cpp_plugin -I./protos proto_file.proto

会在output_directory中生成.proto同名的*.pb.h*.pb.cc文件,以及*.grpc.pb.h*.grpc.pb.cc*.grpc.pb.*里面就包含了根据service生成的代码。

Python版的protoc是已模块的形式安装的,使用方法

1
python -m grpc_tools.protoc --python_out=./output_directory --pyi_out=./output_directory --grpc_out=./output_directory --plugin=protoc-gen-grpc=/bin/grpc_python_plugin -I./protos proto_file.proto

另外其实上面通过C源码编译安装的protoc工具也是支持生成python代码的,但python的protoc模块不能生成C版本的代码。

项目文件存放结构

不是必须的,我感觉这样存放会比较方便,protosprotos_gen是客户端和服务器共用的,而且必须一致,所以只存统一的一份,这里的protos_gen就是由protos生成的意思也就是上面的output_directory目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
.
├── calc_client
│ ├── CMakeLists.txt
│ └── src
│ └── main.cpp
├── calc_server
│ ├── CMakeLists.txt
│ └── src
│ └── main.cpp
├── protos
│ └── calc.proto
└── protos_gen
├── calc.grpc.pb.cc
├── calc.grpc.pb.h
├── calc.pb.cc
└── calc.pb.h

CMake构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
cmake_minimum_required(VERSION 3.10)
project(calc_server CXX C)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17")

set(CMAKE_C_FLAGS_DEBUG " -std=c99 -g -ggdb -O0 -Wall -Wno-unused-function -fpic -fPIC -D_DEBUG")
set(CMAKE_CXX_FLAGS_DEBUG " -std=c++17 -g -ggdb -O0 -Wall -Wno-unused-function -fpic -fPIC -D_DEBUG")

set(CMAKE_C_FLAGS_RELEASE " -std=c99 -O3 -Wall -Wno-unused-function -fpic -fPIC")
set(CMAKE_CXX_FLAGS_RELEASE " -std=c++17 -O3 -Wall -Wno-unused-function -fpic -fPIC")

# Find Protobuf installation
# Looks for protobuf-config.cmake file installed by Protobuf's cmake installation.
option(protobuf_MODULE_COMPATIBLE TRUE)
find_package(Protobuf CONFIG REQUIRED)
message(STATUS "Using protobuf ${Protobuf_VERSION}")

# Find gRPC installation
# Looks for gRPCConfig.cmake file installed by gRPC's cmake installation.
find_package(gRPC CONFIG REQUIRED)
message(STATUS "Using gRPC ${gRPC_VERSION}")

include_directories(
/include # 这里替换成gRPC的编译安装目录下的include
include
../protos_gen # 这里是通过protoc生成的文件存放的目录有.h
)
link_directories(
/lib64 # 这里替换成gRPC的编译安装目录下的lib64
lib
)

aux_source_directory(src SRC_LIST)
aux_source_directory(../protos_gen PROTOBUF_SRC_LIST) # 这里是通过protoc生成的文件存放的目录有.cc
add_executable(${PROJECT_NAME}
${SRC_LIST}
${PROTOBUF_SRC_LIST}
)

target_link_libraries(${PROJECT_NAME}
pthread
absl::flags
absl::flags_parse
protobuf::libprotobuf
gRPC::grpc++_reflection
gRPC::grpc++
)

服务端和客户端的CMakeLists.txt都可以套用上面的模板

cpp服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <iostream>

#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/str_format.h"

#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>

#include "calc.grpc.pb.h"

using namespace std;

// 定义命令行参数,服务器端口号,默认为 50051
ABSL_FLAG(uint16_t, port, 50051, "Server port for the service");

class CalcServiceImpl final : public calc::Calc::Service { // 继承并实现在helloworld.proto中定义的接口
grpc::Status Add(grpc::ServerContext* context, const calc::AddRequest* request, calc::AddReply* reply) override {
// 从request中取参数
auto a = request->a();
auto b = request->b();

// 调用真正的处理逻辑,这里是简单的加法,ServiceImpl可以作为Proxy存在,持有一个业务对象的指针,然后调用相关的处理函数
auto sum = a + b;

// 设置reply中的字段作为返回值
reply->set_sum(sum);
return grpc::Status::OK;
}
};

int main(int argc, char** argv) {
// 解析命令行参数
absl::ParseCommandLine(argc, argv);
uint16_t port = absl::GetFlag(FLAGS_port);
// 构建服务器地址字符串
std::string server_address = absl::StrFormat("0.0.0.0:%d", port);

// 创建服务实现对象
CalcServiceImpl service;

// 启用默认的健康检查服务和反射插件
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
// 创建服务器构建器
grpc::ServerBuilder builder;

// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());

// 注册服务实现对象到服务器构建器
builder.RegisterService(&service);

// 构建并启动服务器
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;

// 等待服务器关闭
server->Wait();

return 0;
}

在启动时可以指定端口号替换默认的端口号

1
./calc-server --port=12345

cpp客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <iostream>
#include <memory>

#include "absl/flags/flag.h"
#include "absl/flags/parse.h"

#include <grpcpp/grpcpp.h>

#include "calc.grpc.pb.h" // 包含通过 Protocol Buffers 编译生成的 gRPC 服务定义

ABSL_FLAG(std::string, target, "localhost:50051", "Server address");

using namespace std;

class CalcClient {
public:
CalcClient(shared_ptr<grpc::Channel> channel)
: _stub(calc::Calc::NewStub(channel)) {} // 通过通道创建 Stub

// 执行加法操作
int Add(int a, int b) {
// 创建请求
calc::AddRequest request;
request.set_a(a);
request.set_b(b);

// 创建响应容器
calc::AddReply reply;

// 创建客户端上下文
grpc::ClientContext context;

// 实际执行 RPC 调用
grpc::Status status = _stub->Add(&context, request, &reply);

// 根据调用状态进行处理
if (status.ok()) {
return reply.sum(); // 返回相加的结果
} else {
cout << status.error_code() << ": " << status.error_message() << endl; // 打印错误信息
return 0;
}
}

private:
unique_ptr<calc::Calc::Stub> _stub; // gRPC Stub
};

int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv); // 解析命令行参数
string target_str = absl::GetFlag(FLAGS_target); // 获取服务器地址参数
cout << "target_str=" << target_str << endl;

// 创建客户端
CalcClient client(grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()));

int a, b;
while(cin >> a >> b) { // 从标准输入读取输入并执行加法操作
auto sum = client.Add(a, b);
cout << a << " + " << b << " = " << sum << endl; // 打印结果
}

return 0;
}

同样地,在启动时可以指定服务器IP和端口号替换默认参数

1
./calc-client --target=127.0.0.1:12345

python服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from concurrent import futures

import grpc
import sys
sys.path.append("../protos_gen")
from calc_pb2 import AddReply # protoc生成的代码
from calc_pb2_grpc import CalcServicer, add_CalcServicer_to_server # protoc生成的代码


class Calc(CalcServicer):
def Add(self, request, context):
return AddReply(sum=request.a + request.b)


def main():
port = '50051'
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
add_CalcServicer_to_server(Calc(), server)
server.add_insecure_port('0.0.0.0:' + port)
server.start()
print("Server started, listening on " + port)
server.wait_for_termination()


if __name__ == '__main__':
main()

python客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import grpc
import sys
sys.path.append("../protos_gen")
from calc_pb2 import AddRequest # protoc生成的代码
from calc_pb2_grpc import CalcStub # protoc生成的代码


class Calc:
def __init__(self, server):
self.channel = grpc.insecure_channel(server)
self.stub = CalcStub(self.channel)

def __del__(self):
del self.stub
self.channel.close()

def Add(self, **kw):
response = self.stub.Add(AddRequest(**kw))
return response.sum


def main():
client = Calc(server='localhost:50051')
s = client.Add(a=1, b=2)
print(f"1+2={s}")


if __name__ == '__main__':
main()

对于新写的代码,最好不要用下面的方式,而是在配置文件中定义BASE_PATH,路径的连接都使用os.path.join,这样代码自然就是跨平台的,对于遗留代码,可以通过下面的to_os_path快速提供平台兼容性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import sys
import os
import re

# 将传入的路径根据当前平台做规范化,以便让路径在Windows和Linux上兼容
# 对于共享或挂载路径,需要共享已经按命名规则挂在到/mnt目录下(在Linux上) 或 已经通过net use建立了连接(在Windows上)
# Windows和Linux上共享或挂载点对应规则:
# \\192.168.1.107\e <=> /mnt/107e
# X: <=> /mnt/X
# \\192.168.1.99\database <=> /mnt/99database
# 用法:
# 将 open(r"\\192.168.1.107\e\NAV\abc.txt")
# 改成 open(to_os_path(r"\\192.168.1.107\e\NAV\abc.txt"))
# 即可做到平台兼容
def to_os_path(path):
if sys.platform == 'linux':
return to_linux_path(path)
else:
return to_windows_path(path)


def to_linux_path(path):
m = re.match(r"\\\\\d+\.\d+\.\d+\.(\d+)\\([^\\])", path)
if m: # 形似一个Windows网络共享地址
return '/mnt/' + m.group(1) + m.group(2) + path[m.span()[1] : ].replace('\\', '/')
m = re.match(r"([a-zA-Z]):\\", path)
if m: # 形似一个Windows本地绝对路径
return '/mnt/' + m.group(1) + '/' + path[m.span()[1] : ].replace('\\', '/')

# 已经是一个Linux的据对路径,或者是Windows或Linux的相对路径
return path.replace('\\', '/')


def to_windows_path(path):
m = re.match(r"/mnt/(\d+)([^\\])", path)
if m:
return r"\\192.168.1.{}\{}{}".format(m.group(1), m.group(2), path[m.span()[1] : ].replace('/', '\\'))
m = re.match(r"/mnt/([a-zA-Z])\b", path)
if m:
return m.group(1) + ":" + path[m.span()[1] : ].replace('/', '\\')
return path.replace('/', '\\')