基于Redis的分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# -*- coding: utf-8 -*-
from redis import Redis
import threading
import time
import uuid

# 互斥锁
class Mutex:
def __init__(self, name, server="127.0.0.1"):
self.name = name
self.key_name = "MUTEX_" + name
self.id = uuid.uuid4().hex
self.redis = Redis(host=server)

def acquire(self, blocking=True, ex=120):
r = self.redis.set(self.key_name, self.id, ex=ex, nx=True)
if blocking:
while not r:
time.sleep(0.01)
r = self.redis.set(self.key_name, self.id, ex=ex, nx=True)
return r

def release(self):
if self.acquired():
self.redis.delete(self.key_name)

def acquired(self):
r = self.redis.get(self.key_name)
return r != None and r.decode() == str(self.id)

def __enter__(self):
self.acquire()

def __exit__(self, exc_type, exc_value, exc_trackback):
self.release()
if exc_value != None:
raise exc_value


# 读写锁
class ReadWriteLock:
def __init__(self, name, server="127.0.0.1"):
self.name = name
self.server = server
self.rlock_name = "RLOCK_" + name
self.wlock_name = "WLOCK_" + name
self.id = uuid.uuid4().hex
self.redis = Redis(host=server)
self.lock_type = None

def read_lock(self, blocking=True, ex=120):
mutex = Mutex(self.name, self.server)
try:
mutex.acquire()
wlock_locked = self.redis.get(self.wlock_name)
if wlock_locked:
if blocking:
while wlock_locked:
mutex.release()
time.sleep(0.05)
mutex.acquire()
wlock_locked = self.redis.get(self.wlock_name)
else:
return False

pipeline = self.redis.pipeline()
now = time.time()
pipeline.zremrangebyscore(self.rlock_name, 0, int((now-ex)*1000) )
pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} )
pipeline.expire(self.rlock_name, ex)
pipeline.execute()
self.lock_type = 'R'
return True
except Exception as e:
raise
finally:
mutex.release()

def write_lock(self, blocking=True, ex=120):
mutex = Mutex(self.name, self.server)
try:
mutex.acquire()
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-ex)*1000) )
r = self.redis.zcard(self.rlock_name)
if r:
if blocking:
while r:
mutex.release()
time.sleep(0.05)
mutex.acquire()
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-ex)*1000) )
r = self.redis.zcard(self.rlock_name)
else:
return False
r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True)
if blocking:
while not r:
mutex.release()
time.sleep(0.05)
mutex.acquire()
r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True)
if r:
self.lock_type = 'W'
return r
except Exception as e:
raise
finally:
mutex.release()

def unlock(self):
if self.lock_type == 'R':
self.redis.zrem(self.rlock_name, self.id)
self.lock_type = None
elif self.lock_type == 'W':
r = self.redis.get(self.wlock_name)
if r != None and r.decode() == str(self.id):
self.redis.delete(self.wlock_name)
self.lock_type = None

支持阻塞和非阻塞加锁,默认阻塞,非阻塞用法。
考虑了锁住后崩溃,解决方案是超时后自动结束。
考虑了未崩溃但是超时(这种情况首先应该调整超时时间设置,或者程序应调整锁住的代码,例如多次分段锁)。
在这种异常情况发生时,可能产生一边释放了锁但还在访问,另一边加上了锁,记住这是异常情况,但我们要保证即使它发生了也尽量能正常工作下去,
对于确实存在访问冲突的那么是没办法的,该异常就异常好了,还有种情况是虽然加了锁,但是并没有访问冲突,其实程序可以正常下去,但是这里会发生什么呢?

1
2
3
4
对于A线程,手动加锁----------超时自动解锁|------------------------手动解锁|
对于B线程,-------------手动加锁-------|(等到此加锁成功)---------------------手动解锁|
对于C线程,-------------------------------|------------手动加锁-------|(如果在此加锁成功是错误的!)

A线程因为超时自动解锁后虽然和B线程没有发生访问冲突,但是它解了B线程的锁,导致C线程加锁成功,而B线程实际还没解锁,这又制造了潜在的B线程和C线程的访问冲突。
所以手动解锁时应该判断下当前的锁是否是自己加的。这就是acquired函数中r.decode() == str(self.id)存在的意义。