业务流程:

1)用户选择实人认证后会在服务端初始化一条记录;
2)用户在钉钉移动端按照指示完成人脸比对;
3)比对完成后访问服务端修改数据库状态。

问题现象:数据库一个人有两条认证记录。

原因:并发导致了不幂等。

图片

如果依赖的组件天然幂等,比如说数据库唯一键的约束,那么不需要做太多的处理,否则,可以采用以下方法来保证幂等。

分布式锁

如何实现一个分布式锁?

方案一

分布式系统中常见有两个问题:

1)单点故障问题,即当持有锁的应用发生单点故障时,锁将被长期无效占有;

2)网络超时问题,即当客户端发生网络超时但实际上锁成功时,我们无法再次正确的获取锁。

要解决问题1,一个简单的方案是引入过期时间(lease time),对锁的持有将是有时效的,当应用发生单点故障时,被其持有的锁可以自动释放。

要解决问题2,一个简单的方案是支持可重入,我们为每个获取锁的客户端都配置一个不会重复的身份标识(通常是UUID),上锁成功后锁将带有该客户端的身份标识。当实际上锁成功而客户端超时重试时,我们可以判断锁已被该客户端持有而返回成功。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
public class MdbDistributeLock implements DistributeLock {

/**
* 锁的命名空间
*/
private final int namespace;

/**
* 锁对应的缓存key
*/
private final String lockName;

/**
* 锁的唯一标识,保证可重入,以应对put成功,但是返回超时的情况
*/
private final String lockId;

/**
* 是否持有锁。true:是
*/
private boolean locked;

/**
* 缓存实例
*/
private final TairManager tairManager;

public MdbDistributeLock(TairManager tairManager, int namespace, String lockCacheKey) {

this.tairManager = tairManager;
this.namespace = namespace;
this.lockName = lockCacheKey;
this.lockId = UUID.randomUUID().toString();
}

@Override
public boolean tryLock() {

try {
//获取锁状态
Result<DataEntry> getResult = null;
ResultCode getResultCode = null;
for (int cnt = 0; cnt < DEFAULT_RETRY_TIMES; cnt++) {
getResult = tairManager.get(namespace, lockName);
getResultCode = getResult == null ? null : getResult.getRc();
if (noNeedRetry(getResultCode)) {
break;
}
}

//重入,已持有锁,返回成功
if (ResultCode.SUCCESS.equals(getResultCode)
&& getResult.getValue() != null && lockId.equals(getResult.getValue().getValue())) {
locked = true;
return true;
}

//不可获取锁,返回失败
if (!ResultCode.DATANOTEXSITS.equals(getResultCode)) {
log.error("tryLock fail code={} lock={} traceId={}", getResultCode, this, EagleEye.getTraceId());
return false;
}

//尝试获取锁
ResultCode putResultCode = null;
for (int cnt = 0; cnt < DEFAULT_RETRY_TIMES; cnt++) {
putResultCode = tairManager.put(namespace, lockName, lockId, MDB_CACHE_VERSION,
DEFAULT_EXPIRE_TIME_SEC);
if (noNeedRetry(putResultCode)) {
break;
}
}
if (!ResultCode.SUCCESS.equals(putResultCode)) {
log.error("tryLock fail code={} lock={} traceId={}", getResultCode, this, EagleEye.getTraceId());
return false;
}
locked = true;
return true;

} catch (Exception e) {
log.error("DistributedLock.tryLock fail lock={}", this, e);
}
return false;
}

@Override
public void unlock() {

if (!locked) {
return;
}
ResultCode resultCode = tairManager.invalid(namespace, lockName);
if (!resultCode.isSuccess()) {
log.error("DistributedLock.unlock fail lock={} resultCode={} traceId={}", this, resultCode,
EagleEye.getTraceId());
}
locked = false;
}

/**
* 判断是否需要重试
*
* @param resultCode 缓存的返回码
* @return true:不用重试
*/
private boolean noNeedRetry(ResultCode resultCode) {
return resultCode != null && !ResultCode.CONNERROR.equals(resultCode) && !ResultCode.TIMEOUT.equals(
resultCode) && !ResultCode.UNKNOW.equals(resultCode);
}
}

// 分布式锁工厂
public class MdbDistributeLockFactory implements DistributeLockFactory {

/**
* 缓存的命名空间
*/
@Setter
private int namespace;

@Setter
private MultiClusterTairManager mtairManager;

@Override
public DistributeLock getLock(String lockName) {
return new MdbDistributeLock(mtairManager, namespace, lockName);
}
}

分布式锁的一般使用方式如下:

  • 初始化分布式锁的工厂
  • 利用工厂生成一个分布式锁实例
  • 使用该分布式实例上锁和解锁操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

@Test
public void testTryLock() {

//初始化工厂
MdbDistributeLockFactory mdbDistributeLockFactory = new MdbDistributeLockFactory();
mdbDistributeLockFactory.setNamespace(603);
mdbDistributeLockFactory.setMtairManager(new MultiClusterTairManager());

//获得锁
DistributeLock lock = mdbDistributeLockFactory.getLock("TestLock");

//上锁解锁操作
boolean locked = lock.tryLock();
if (!locked) {
return;
}
try {
//do something
} finally {
lock.unlock();
}
}

这样实现简单,但是也存在问题:释放锁的时候只是简单的将缓存中的key失效,所以存在错误释放他人已持有锁问题。

图片

设想一种情况,当占有锁的Client 1在释放锁之前,锁就已经到期了,Client 2将获取锁,此时锁被Client 2持有,但是Client 1可能会错误的将其释放。只要锁的租期设置的足够长,该问题出现几率就足够小。

一个更优秀的方案,我们给每个锁都设置一个身份标识,在释放锁的时候,1)首先查询锁是否是自己的,2)如果是自己的则释放锁。

受限于实现方式,步骤1和步骤2不是原子操作,在步骤1和步骤2之间,如果锁到期被其他客户端获取,此时也会错误的释放他人的锁。

方案二

借助Redis的Lua脚本,可以完美的解决存在错误释放他人已持有锁问题的。

当我们想要获取锁时,我们可以执行如下方法:

1
SET resource_name my_random_value NX PX 30000

当我们想要释放锁时,我们可以执行如下的Lua脚本

1
2
3
4
5
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

方案三

在方案一和方案二的讨论过程中,有一个问题被我们反复提及:锁的自动释放。

他的好处以及坏处如下:

1)一方面它很好的解决了持有锁的客户端单点故障的问题

2)另一方面,如果锁提前释放,就会出现锁的错误持有状态

这个时候,我们可以引入Watch Dog自动续租机制,参考以下Redisson是如何实现的。

在上锁成功后,Redisson会调用renewExpiration()方法开启一个Watch Dog线程,为锁自动续期。每过1/3时间续一次,成功则继续下一次续期,失败取消续期操作。

以下是Redisson的续期操作,Redisson也是使用Lua脚本进行锁续租的:1)判断锁是否存在;2)如果存在则重置过期时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}

Timeout task = commandExecutor.getConnectionManager().newTimeout(timeout -> {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}

RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}

if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}

方案四

借助Redisson的自动续期机制,我们无需再担心锁的自动释放。但是讨论到这里,我还是不得不面对一个问题:分布式锁本身不是一个分布式应用。当Redis服务器故障无法正常工作时,整个分布式锁也就无法提供服务。

参考

《阿里博客》