0%

需求情景:

存放日志或接收并存储数据的目录,为了防止程序出错时疯狂写日志硬盘被日志或存储的数据塞满。

原理,创建一个固定大小的img文件,映射成一个目录

1
2
3
4
5
6
7
8
# 创建Log.img大小1G
dd if=/dev/zero of=./Log.img bs=1M count=1024
# 格式化 ext4
mkfs.ext4 ./Log.img
# 创建挂载目录
mkdir ./Log
# 挂载
sudo mount -o loop ./Log.img ./Log

搭建PPTP服务器

参考:https://blog.csdn.net/sanve/article/details/80882731

安装

1
apt install pptpd

配置IP

1
vim /etc/pptpd.conf

查找并解开这3处的注释并修改相应配置

1
2
3
#bcrelay eth1 
#localip 192.168.10.1
#remoteip 192.168.10.234-238,192.168.10.245

bcrelay的意思我理解是来自虚拟局域网的广播要从哪个物理网卡转发出去

localip的意思本机是VPN服务器的IP地址

remoteip的意思是当有远程VPN客户端连接上来时被分配的IP段

我认为应该把虚拟局域网的网段和物理局域网分开,而且localip和remoteip应该在同一个网段,网上有localip和remoteip不应该在同一个网段的说法我并不认同。

配置DNS

1
vim /etc/ppp/pptpd-options 

找到ms-dns解开注释并修改相应配置

1
2
ms-dns 114.114.114.114
ms-dns 8.8.8.8

配置用户名密码

1
vim /etc/ppp/chap-secrets 
1
2
3
# Secrets for authentication using CHAP
# client server secret IP addresses
<username> pptpd <password> *

修改配置后重启PPTP服务器

1
service pptpd restart 

打开IPv4转发

1
vim /etc/sysctl.conf 

找到并修改或添加

1
net.ipv4.ip_forward = 1 

使配置生效

1
2
sysctl -p service
procps restart

在网关上添加指向VPN网段的静态路由,设置下一跳地址为架设VPN服务器主机的IP,这样整个局域网都是可以通过VPN正常访问的。

如果没有操作网关的权限,也可以在需要访问的主机上添加静态路由

1
ip route add 192.168.10.0/24 via 192.168.1.99 

这里192.168.1.99就是架设VPN服务器的主机IP

Ubuntu PPTP客户端

参考:https://www.jianshu.com/p/1680c721f397 http://www.linuxfly.org/post/641/

1
pptpsetup --create 连接名 --server VPN服务器地址 --username 用户名 --password 密码 --encrypt

启动VPN连接

1
pon 连接名

关闭VPN连接

1
poff 连接名

查看路由表

1
route -n

添加路由,将通过VPN接入的物理局域网网段路由指定通过ppp0网口转发

1
route add -net 192.168.1.0 netmask 255.255.255.0 dev ppp0 

每次重连后路由会丢失,需要重新配置,可以用下面的方法在ppp0 up时自动添加路由

pptp连接时自启动添加路由

参考:https://blog.csdn.net/qq_27434019/article/details/102920504

1
vim /etc/ppp/peers/连接名 

增加ipparam一行

1
ipparam 连接名 

下面这步似乎不必要,不清楚作用

1
vim /etc/network/interfaces 

增加

1
2
3
auto tunnel
iface tunnel inet ppp
provider 连接名

======================================

新建脚本文件并修改权限

1
2
touch /etc/ppp/ip-up.d/连接名
chmod a+x /etc/ppp/ip-up.d/连接名

编辑脚本

1
vim /etc/ppp/ip-up.d/连接名 

在脚本中加入添加路由的语句

1
route add -net 192.168.1.0 netmask 255.255.255.0 dev ppp0

可以poff 再 pon 再 route -n 看看路由是否自动添加

服务器其他用户反馈读取数据库很卡。

通过 sar -d -p 3 命令发现硬盘占用率比较高

通过 iotop 命令发现主要是被一个名为 [jbd2/sda2-8] 的进程占用

网上说法是 (13条消息) 性能分析之IO分析-jbd2引起的IO高_hualusiyu的专栏-CSDN博客

1
jbd2的全称是journaling block driver 。这个进程实现的是文件系统的日志功能,磁盘使用日志功能来保证数据的完整性。这个需要评估一下安全和性能哪个更重要,解决方案是升级内核或者牺牲完整性来换性能。

差点被误导。

而使用命令 atop -d 发现其实是 snapd 占用

和这篇帖子情况一样 snapd持续运行,引起jbd2/sda2-8持续访问硬盘,占用大量io - Ubuntu中文论坛

1
2
3
[#6](https://forum.ubuntu.org.cn/viewtopic.php?p=3221983#p3221983)
帖子 由 sffred » 2020-06-06 16:04
我最终解决这个问题的方式是卸载snapd。反正我也用不着

snapd是ubuntu预装的一个软件包管理工具。

使用 snap list 发现只有一个core,也就是我没有基于snap安装过软件包。

通过 service snapd stop 关闭snapd,再通过 sar -d -p 3 观察硬盘占用,已经完全正常

至此确定是由snapd引发.

通过service snapd start 启动snapd,观察硬盘占用,先是再次上升数十秒后回到了正常。

如果下次再出现占用过高准备禁用或卸载snap。

禁用 systemctl disable snapd.service 卸载 apt purge snapd

相关命令

持续观察硬盘读写情况,每3秒刷新一次

1
sar -d -p 3

sar常用的的参数还有监控CPU情况的

1
sar -u 3

按IO从高到低排序监控进程,实时刷新

1
iotop

也能按IO从高到低排序,实时刷新,感觉比iotop好用

1
atop -d

atop也有监控CPU情况的

1
atop -u

在需要被挂载的服务器上安装nfs-kernel-server

1
sudo apt install nfs-kernel-server

/etc/exports

通过此文件配置共享目录,样例

1
2
3
4
5
6
7
8
9
10
# /etc/exports: the access control list for filesystems which may be exported
# to NFS clients. See exports(5).
#
# Example for NFSv2 and NFSv3:
# /srv/homes hostname1(rw,sync,no_subtree_check) hostname2(ro,sync,no_subtree_check)
#
# Example for NFSv4:
# /srv/nfs4 gss/krb5i(rw,sync,fsid=0,crossmnt,no_subtree_check)
# /srv/nfs4/homes gss/krb5i(rw,sync,no_subtree_check)
/data/share 192.168.1.157(insecure,rw,sync,all_squash,no_subtree_check) *(insecure,ro,async,all_squash,no_subtree_check)

这里配置了/data/share为共享目录,允许从192.168.1.157进行读写操作,允许从其他任意IP进行只读操作。

把共享目录的owner改成 nobody:nogroup

1
sudo chown nobody:nogroup /data/share

设置共享目录的权限

1
sudo chmod 777 /data/share

重新加载配置文件

1
sudo exportfs -a

在需要挂载的服务器需要安装nfs-common

1
apt install nfs-common

挂载命令样例 ,把140上的/data/share共享目录挂载到本地/mnt/140share

1
mount 192.168.1.140:/data/share /mnt/140share

端口

使用到的端口,如果有防火墙需要设置

111/tcp+udp

2049/tcp

不同版本的NFS服务器也可能需要更多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Data ONTAP:
111 TCP/UDP portmapper
2049 TCP/UDP nfsd
635 TCP/UDP mountd
4045 TCP/UDP nlockmgr
4046 TCP/UDP status
4049 TCP/UDP rquotad

Data ONTAP 7-Mode:
111 TCP/UDP portmapper
2049 TCP/UDP nfsd
4046 TCP/UDP mountd
4045 TCP/UDP nlockmgr
4047 TCP/UDP status
4049 TCP/UDP rquotad

参考:Which Network File System (NFS) TCP and NFS UDP ports are used on the storage system? - NetApp Knowledge Base

另外可能用到一个随机端口,在/etc/default/nfs-kernel-server中通过

1
RPCMOUNTDOPTS="--port 33333"

可以使之固定为33333。

更改设置后重启服务

1
service nfs-kernel-server restart

如果希望系统启动时自动加载文件系统

/etc/fstab

通过此文件配置系统启动时自动挂载

样例

1
2
# <file system> <mount point>   <type>  <options>       <dump>  <pass>
192.168.1.140:/data/share /mnt/140share nfs defaults 0 0

重新加载配置文件

1
sudo mount -a

配置中type是被挂载的路径的类型

常用的类型有:

nfs表示远程linux的共享路径

cifs表示远程windows的共享路径

ext4表示本地ext4路径

配置中options是挂载选项,在挂载windows目录时,通过以下方式指定用户名和密码

1
defaults,auto,username=MZhDai,password=******,dir_mode=0777,file_mode=0777

另外挂载windows目录需要 mount.cifs 支持,如果没有可以通过以下命令安装

1
sudo apt install cifs-utils

挂载命令通过 -o指定选项

1
sudo mount.cifs //192.168.1.107/f /mnt/107f -o user=MZhDai,pass=******,dir_mode=0777,file_mode=0777

还有常用的选项如

1
2
3
4
5
ro :只读挂载
rw : 读写挂载
uid=`id -u <owner>` : 指定挂载后的所有者
gid=`id -g <group>` : 指定挂载后的所有组
iocharset=utf8 : 指定编码为uft8,解决中文乱码问题,我在WSL2挂载Windows共享遇到过一次

.cifs 可以省略,mount命令会自动识别需要挂载的路径类型

访问挂载的共享目录卡住

执行

1
ls /mnt

1
df -h

时卡住

可以使用

1
strace ls /mnt

1
strace df -h

查看命令执行的过程,看看最终卡在哪一步,比如卡在访问/mnt/abc

查看挂载参数

1
cat /proc/mounts

查看服务器支持的版本

1
nfsstat -s

卸载挂载点

1
umount <挂载点>

如果卡住试下强制卸载

1
umount -f <挂载点>

1
umount -lf <挂载点>

如果还是卸载失败可以看一下哪个进程在使用挂载点下的文件

1
lsof|grep <挂载点>

考虑杀死进程后再卸载

指定挂载使用的NFS协议版本

1
mount.nfs XXX XXX -o nfsvers=3

一般是v4版本卡住,改用v3可以解决

OSError: [Errno 37] Unable to open file

增加nolock挂载参数

1
mount.nfs XXX XXX -o nfsvers=3,nolock

主服务器:192.168.1.99

从服务器:192.168.1.150

配置主服务器

在主服务器创建repl用户

1
CREATE ROLE repl login replication password 'd71ea3'; 

配置repl用户访问权限

1
vim /etc/postgresql/10/main/pg_hba.conf
1
host    replication     repl            192.168.1.150/32               md5

配置主服务器

1
vim /etc/postgresql/10/main/postgresql.conf
1
2
3
4
5
6
7
8
9
wal_level = replica

archive_mode = on # enables archiving; off, on, or always
# (change requires restart)
archive_command = 'rsync -zaq %p postgres@192.168.1.150:/var/lib/postgresql/wal_restore/%f && test ! -f /var/lib/postgresql/archivedir/%f && cp %p /var/lib/postgresql/archivedir/%f'

max_wal_senders = 10
wal_keep_segments = 64
wal_sender_timeout = 60s

归档命令不加入rsync也可以,只需要在建立主从同步时手动把完整备份之后的归档复制到从库,后面配置从库时候会提到。

我实际使用的归档命令还加入了自动删除旧数据

1
archive_command = 'DIR=/var/lib/postgresql/archivedir; test ! -f $DIR/%f && cp --preserve=timestamps %p $DIR/%f; find $DIR -type f -mtime +31|xargs rm -f'

重启服务

1
service postgresql restart

配置从服务器

停止服务

1
service postgresql stop

删除所有数据

1
2
3
cd /var/lib/postgresql/10/main

rm -rf *

配置从服务器

1
vim /etc/postgresql/10/main/postgresql.conf
1
hot_standby = on

切换到postgres用户

1
sudo su - postgres

从主服务器创建初始备份,上面切换用户是为了不用调整文件权限

pg_basebackup -h 192.168.1.99 -U repl -D /var/lib/postgresql/10/main -F p -X stream -P -R -p 5432

Password: d71ea3

会自动生成 recovery.conf 启动之后会读取里面的配置进行主从同步

切回root

1
sudo su -

如果前面主服务器归档命令没有加入rsync,那么我们现在在主服务器上 复制最近一天内修改过的归档文件到从服务器

1
2
cd /var/lib/postgresql/archivedir
find -mtime -1|xargs -n 1 -I{} scp /var/lib/postgresql/archivedir/{} postgres@192.168.1.150:/var/lib/postgresql/wal_restore/

相当于整体上从库建立起同步需要的数据 = 完整备份 + 归档文件 + WAL缓存

从服务器启动服务

1
service postgresql start

从主服务器查看从服务器同步状态

1
select application_name, sync_state from pg_stat_replication;

查看系统进程运行状态,包括服务

1
systemctl status

所有可用的单元文件存放在 /lib/systemd/system/ 和 /etc/systemd/system/ 目录。

根据我的实验情况是我们应该在/lib/systemd/system/ 下存放.service文件,当设置了自启动后,会自动在 /etc/systemd/system/ 下创建一个软链接指向 /lib/systemd/system/ 下的文件。

查看所有已安装服务:

1
systemctl list-units --type=service

通过服务状态可以查看启动服务的.service配置文件

例如

1
service mongodb status

可以看到

1
/lib/systemd/system/mongodb.service

最重要的,运行命令,

1
ExecStart=/usr/bin/mongod --unixSocketPrefix=${SOCKETPATH} --config ${CONF} $DAEMON_OPTS

PS:要注意的是ExecStart指定的是一个阻塞的程序,不需要后台执行,如果不阻塞,服务会认为程序执行完了,认为服务不在启动状态。

以Kafka为例

1
2
3
4
5
6
7
8
9
10
11
12
13
[Unit]
Description=Kafka Server
After=network.target zookeeper.service

[Service]
Type=simple
ExecStart=/opt/kafka_2.12-2.3.1/bin/kafka-server-start.sh /opt/kafka_2.12-2.3.1/config/server.properties
Restart=on-failure
RestartPreventExitStatus=255

[Install]
WantedBy=multi-user.target
Alias=kafka.service

详细的.service文件编写方法可以参考 http://www.jinbuguo.com/systemd/systemd.service.html

修改服务配置文件后需要

1
systemctl daemon-reload

设置服务开机自启动

1
systemctl enable postgresql.service

查询是否自启动服务

1
systemctl is-enabled postgresql.service

取消服务器开机自启动

1
systemctl disable postgresql.service

Alembic需要SQLAlchemy支持,如果项目是使用Python基于SQLAlchemy的开发的,那么可以用Alembic管理数据库版本变化。

Alembic将版本间的一组变化称为一个迁移,将变化过程称为迁移。我们可以说从一个版本迁移到另一个版本。

我们先新建一个目录alembic

初始化Alembic

1
alembic init <YOUR_ALEMBIC_DIR>

例如

alembic init alembic_foo,foo可以是项目的数据库名

有两个文件需要修改

alembic.ini alembic_foo/env.py

alembic.ini 中修改sqlalchemy.url

例如 sqlalchemy.url = postgresql+psycopg2://用户名:密码@ip_address/dbname

env.py 修改 # target_metadata = mymodel.Base.metadata

修改把项目中用来定义数据模型基类的Base import进来,改成 target_metadata = Base.metadata,以便Alembic知道项目中有哪些表,表结构是什么

自动生成迁移(生成初始迁移)

1
alembic revision --autogenerate -m <log_message>

我都是用日期和时间作为log_message,例如

1
alembic revision --autogenerate -m "%date:~0,4%%date:~5,2%%date:~8,2%%time:~0,2%%time:~3,2%%time:~6,2%"

自动生成迁移的时候,Alembic会对比数据库现有的表结构和配置的Base.metadata对应的表结构,生成差异转换代码,我们首次生成的时候一般两边是一致的,所以生成的迁移脚本其实是什么都不做。

执行上面的命令可以看到会生成 alembic_foo/versions/xxxx.py ,其中xxx就是revision+log_message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"""20200326 91627

Revision ID: bab1fb444e93
Revises:
Create Date: 2020-03-26 09:16:31.264071

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'bab1fb444e93'
down_revision = None
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###

把数据库升级到最新版本

1
alembic upgrade head

其实会做一遍upgrade中的命令,现在为空就什么都不做。

其实alembic做了一件事,在数据库中创建了一张名为alembic_version的表,里面只有一行一列记录着当前的数据库版本

以后要修改数据库模型时的步骤如下:

(1)修改SQLAlchemy Base那边的代码,即通过ORM框架定义的表示数据表结构的Python类。

(2)执行 alembic revision --autogenerate ... 命令生成迁移脚本。

(3)检查自动生成的脚本,改成不准确的地方。(例如重命名会变成删除再添加,会丢失数据)

(4)执行 alembic upgrade head 把改动应用到数据库

降级或升级数据库

升级到最新前面已经说过了

1
alembic upgrade head

要指定版本的话,看到前面自动生成的py文件里面有个Revision ID,同时也是py文件的前缀。

升级到指定版本

1
alembic upgrade <Revision ID>

降级到指定版本

1
alembic downgrade <Revision ID>

参考:

https://alembic.sqlalchemy.org/en/latest/tutorial.html

https://www.cnblogs.com/blackmatrix/p/6236573.html

监视主线程卡死,卡死时保错再主动崩溃退出,同时打印当时的调用堆栈。
还可以再外部结合监视崩溃自动重启就可以在卡死时实现自动重启。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import threading
import traceback
import time
import sys
import os
from functools import wraps

class WatchDog(threading.Thread):

def __init__(self, timeout=10, echo=False):
super(WatchDog, self).__init__()
self.timeout = timeout
self.echo = echo
self.last_kicked_ts = time.time()
self.lock = threading.Lock()
self.thread_id = threading.currentThread().ident
self.terminated = False
self.setDaemon(True)
self.start()

def terminate(self):
self.terminated = True
self.join(self.timeout)

def kick(self):
self.lock.acquire()
self.last_kicked_ts = time.time()
self.lock.release()

def bark(self):
formated_frame_stack = self._get_formated_frame_stack()
if self.echo:
print("!!!!! WATCH DOG FAILURE TRIGGERED !!!!!\n" + formated_frame_stack, flush=True)
pid = os.getpid()
os.kill(pid, 2) # 通知进程退出
time.sleep(5) # 等待5秒
os.kill(pid, 9) # 发送强制退出

def run(self):
while not self.terminated:
ts = time.time()
self.lock.acquire()
is_timeout = ts - self.last_kicked_ts > self.timeout
self.lock.release()
if is_timeout:
self.bark()
n = int(max(self.timeout / 3, 1))
for i in range(n*10):
time.sleep(0.1)
if self.terminated:
break

@staticmethod
def _get_thread(tid):
for t in threading.enumerate():
if t.ident == tid:
return t
return None

@staticmethod
def _get_frame_stack(tid):
for thread_id, stack in sys._current_frames().items():
if thread_id == tid:
return stack
return None

def _get_formated_frame_stack(self):
info = []
th = self._get_thread(self.thread_id)
stack = self._get_frame_stack(self.thread_id)
info.append('%s thead_id=%d' % (th.name, self.thread_id))
for filename, lineno, _, line in traceback.extract_stack(stack):
info.append(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
return '\n'.join(info)


def watch_dog(timeout=10, echo=False):
def inner(func):
def wrapper(*args, **kw):
dog = WatchDog(timeout=timeout, echo=echo)
ret = func(*args, **kw)
dog.terminate()
return ret
return wrapper
return inner

用例1,监控函数超时

1
2
3
4
5
6
7
8
9
10
@watch_dog(timeout=3, echo=True)
def func():
# do something
time.sleep(5)

def main():
func()

if __name__ == '__main__':
main()

我们监视func,设置其3秒超时,然后在里面sleep 5秒来引起超时,可以看到输出如下

1
2
3
4
5
6
7
!!!!! WATCH DOG FAILURE TRIGGERED !!!!!
MainThread thead_id=2164
at main()(watch_dog.py:97)
at func()(watch_dog.py:94)
at ret = func(*args, **kw)(watch_dog.py:81)
at time.sleep(5)(watch_dog.py:91)
[Finished in 3.5s with exit code 2]

在程序启动3.5秒后就主动崩溃了

用例2,监控循环超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def main():
watch_dog = WatchDog(timeout=3, echo=True)

# main loop
# while True:
for i in range(0, 10):
time.sleep(i)
print(f"i={i}")
watch_dog.kick()
watch_dog.terminated()


if __name__ == '__main__':
main()

输出如下

1
2
3
4
5
6
7
8
9
i=0
i=1
i=2
i=3
!!!!! WATCH DOG FAILURE TRIGGERED !!!!!
MainThread thead_id=9060
at main()(watch_dog.py:101)
at time.sleep(i)(watch_dog.py:94)
[Finished in 10.1s with exit code 2]

用例3,超时自动重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import multiprocessing

def main():
...

if __name__ == '__main__':
RETRY_TIMES = 5
for i in range(RETRY_TIMES): # 重试次数
child_process = multiprocessing.Process(target=main)
child_process.start()
child_process.join()

if child_process.exitcode == 0:
print("子进程正常结束")
exit(0)
elif i < RETRY_TIMES - 1:
print("子进程异常结束,即将重试")
else:
print("子进程异常结束,重试超次")
exit(child_process.exitcode)

通过

1
tzselect

修改时区

根据屏幕提示选择选到 Asia/Shanghai 我这里是按4-9-1

然后把

1
TZ='Asia/Shanghai'; export TZ

加入到 .profile 中保证重启后有效

修改 /etc/localtime 链接到的文件

1
ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

修改 /etc/timezone 的文件内容

1
echo Asia/Shanghai > /etc/timezone

如果在第一步tzselect就报错,需要先安装tzdata,也可以用下面的命令一步到位

1
2
3
4
5
6
7
export TZ=Asia/Shanghai \
DEBIAN_FRONTEND=noninteractive
apt update -yqq \
&& apt install -yqq tzdata \
&& ln -fs /usr/share/zoneinfo/${TZ} /etc/localtime \
&& echo ${TZ} > /etc/timezone \
&& dpkg-reconfigure --frontend noninteractive tzdata

通过driver可以进行类似ActiveObject模式的设计,在此基础上还实现了通用的消息机制,基于driver进行编程可以充分的解耦。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding: utf-8 -*-
import threading
import time
import queue
from dataclasses import dataclass, field
from typing import Any
import signal
import traceback
import os


@dataclass(order=True)
class ScheduledItem:
time_sec: float
cb: Any = field(compare=False)


class Driver:
def __init__(self):
self._scheduled = queue.PriorityQueue()
self._scheduled_every_lock = threading.Lock()
self._scheduled_every = []
self._callbacks_lock = threading.Lock()
self._callbacks = {}
self._async_queue = queue.Queue()
self._epoch_sec = time.time()
self._debug_epoch = None
self._last_epoch = None
self._sigterm_received = False

# 得到driver内的当前时间
def get_epoch(self):
return self._epoch_sec

# 设置调试时间,设置了调试时间driver内将不更新当前时间而采用调试时间
def set_debug_epoch(self, epoch):
self._debug_epoch = epoch

# 主循环
# wait_sync_interval 每趟循环的时间间隔,如果为0相当于占用一个core
# intercept_signals 拦截并处理信号,默认处理CTRL+C和kill,收到时退出主循环
def run(self, wait_sync_interval=0, intercept_signals=(signal.SIGINT, signal.SIGTERM)):
if intercept_signals:
self.intercept_sigterm(intercept_signals)
while not self.sigterm_received():
self.run_step(wait_sync_interval)

# 执行一趟主逻辑,一般放在主循环中执行
def run_step(self, wait_sync_interval=0):
if self._debug_epoch:
self._epoch_sec = self._debug_epoch
else:
self._epoch_sec = time.time()
if self._last_epoch is not None:
if self._epoch_sec - self._last_epoch < wait_sync_interval:
t = wait_sync_interval - (self._epoch_sec - self._last_epoch)
time.sleep(t)
self._epoch_sec = time.time()
self._last_epoch = self._epoch_sec

self._do_async()
self._do_schedule()
self._do_schedule_every()

# 计划单次定时任务
def schedule(self, cb, time_sec):
self._scheduled.put_nowait( ScheduledItem(time_sec, cb) )

# 计划重复任务
def schedule_every(self, cb, interval_sec):
self._scheduled_every_lock.acquire()
self._scheduled_every.append( { "next_sec":self._epoch_sec+interval_sec, "interval":interval_sec, "cb":cb } )
self._scheduled_every_lock.release()

# 增加消息接收者
def add_receiver(self, topic_or_type, cb):
self._callbacks_lock.acquire()
if topic_or_type not in self._callbacks:
self._callbacks[topic_or_type] = set()
self._callbacks[topic_or_type].add(cb)
self._callbacks_lock.release()
return cb

# 删除消息接收者
def remove_receiver(self, topic_or_type, cb):
self._callbacks_lock.acquire()
if topic_or_type in self._callbacks:
if cb in self._callbacks[topic_or_type]:
self._callbacks[topic_or_type].remove(cb)
self._callbacks_lock.release()

# 同步发送消息
def send(self, obj, topic=None):
if topic == None:
topic = type(obj)
cbs = []
self._callbacks_lock.acquire()
if topic in self._callbacks.keys():
cbs = list(self._callbacks[topic])
self._callbacks_lock.release()
for cb in cbs:
cb(obj)

# 异步发送消息
def send_async(self, obj, topic=None):
self._async_queue.put_nowait( (obj, topic) )

def _do_async(self):
while not self._async_queue.empty():
self.send(*self._async_queue.get_nowait())

def _do_schedule(self):
i = 0
while not self._scheduled.empty():
item = self._scheduled.get_nowait()
if item.time_sec > self._epoch_sec:
self._scheduled.put_nowait(item)
break
item.cb(self._epoch_sec)

def _do_schedule_every(self):
cbs = []
self._scheduled_every_lock.acquire()
for o in self._scheduled_every:
while self._epoch_sec >= o["next_sec"]:
cbs.append(o["cb"])
o["next_sec"] += o["interval"]
self._scheduled_every_lock.release()
for cb in cbs:
cb(self._epoch_sec)

# 拦截终止信号
def intercept_sigterm(self, intercept_signals):
for sig in intercept_signals:
signal.signal(sig, bind(Driver._on_signal, self))

# 是否收到了终止信号
def sigterm_received(self):
return self._sigterm_received

def _on_signal(self, signum, stack):
print(f"Signal #{signum} received, Traceback:")
for filename, lineno, _, line in traceback.extract_stack(stack):
print(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
self._sigterm_received = True


def bind(func, *args, **kw):
return lambda *_args, **_kw: func(*args, *_args, **kw, **_kw)


driver = Driver()


if __name__ == '__main__':
print("initialize")

driver.schedule(lambda epoch: print(f"Do sth. one time"), driver.get_epoch()+3)
driver.schedule_every(lambda epoch: print(f"Do sth. regularly"), 1)
driver.schedule(lambda _: os.system(f"kill {os.getpid()}"), driver.get_epoch()+10)
driver.run(0.1)

print("finalize")