业务流程:
1)用户选择实人认证后会在服务端初始化一条记录;
2)用户在钉钉移动端按照指示完成人脸比对;
3)比对完成后访问服务端修改数据库状态。
问题现象:数据库一个人有两条认证记录。
原因:并发导致了不幂等。

如果依赖的组件天然幂等,比如说数据库唯一键的约束,那么不需要做太多的处理,否则,可以采用以下方法来保证幂等。
分布式锁
如何实现一个分布式锁?
方案一
分布式系统中常见有两个问题:
1)单点故障问题,即当持有锁的应用发生单点故障时,锁将被长期无效占有;
2)网络超时问题,即当客户端发生网络超时但实际上锁成功时,我们无法再次正确的获取锁。
要解决问题1,一个简单的方案是引入过期时间(lease time),对锁的持有将是有时效的,当应用发生单点故障时,被其持有的锁可以自动释放。
要解决问题2,一个简单的方案是支持可重入,我们为每个获取锁的客户端都配置一个不会重复的身份标识(通常是UUID),上锁成功后锁将带有该客户端的身份标识。当实际上锁成功而客户端超时重试时,我们可以判断锁已被该客户端持有而返回成功。具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
| public class MdbDistributeLock implements DistributeLock {
private final int namespace;
private final String lockName;
private final String lockId;
private boolean locked;
private final TairManager tairManager;
public MdbDistributeLock(TairManager tairManager, int namespace, String lockCacheKey) {
this.tairManager = tairManager; this.namespace = namespace; this.lockName = lockCacheKey; this.lockId = UUID.randomUUID().toString(); }
@Override public boolean tryLock() {
try { Result<DataEntry> getResult = null; ResultCode getResultCode = null; for (int cnt = 0; cnt < DEFAULT_RETRY_TIMES; cnt++) { getResult = tairManager.get(namespace, lockName); getResultCode = getResult == null ? null : getResult.getRc(); if (noNeedRetry(getResultCode)) { break; } }
if (ResultCode.SUCCESS.equals(getResultCode) && getResult.getValue() != null && lockId.equals(getResult.getValue().getValue())) { locked = true; return true; }
if (!ResultCode.DATANOTEXSITS.equals(getResultCode)) { log.error("tryLock fail code={} lock={} traceId={}", getResultCode, this, EagleEye.getTraceId()); return false; }
ResultCode putResultCode = null; for (int cnt = 0; cnt < DEFAULT_RETRY_TIMES; cnt++) { putResultCode = tairManager.put(namespace, lockName, lockId, MDB_CACHE_VERSION, DEFAULT_EXPIRE_TIME_SEC); if (noNeedRetry(putResultCode)) { break; } } if (!ResultCode.SUCCESS.equals(putResultCode)) { log.error("tryLock fail code={} lock={} traceId={}", getResultCode, this, EagleEye.getTraceId()); return false; } locked = true; return true;
} catch (Exception e) { log.error("DistributedLock.tryLock fail lock={}", this, e); } return false; }
@Override public void unlock() {
if (!locked) { return; } ResultCode resultCode = tairManager.invalid(namespace, lockName); if (!resultCode.isSuccess()) { log.error("DistributedLock.unlock fail lock={} resultCode={} traceId={}", this, resultCode, EagleEye.getTraceId()); } locked = false; }
private boolean noNeedRetry(ResultCode resultCode) { return resultCode != null && !ResultCode.CONNERROR.equals(resultCode) && !ResultCode.TIMEOUT.equals( resultCode) && !ResultCode.UNKNOW.equals(resultCode); } }
public class MdbDistributeLockFactory implements DistributeLockFactory {
@Setter private int namespace;
@Setter private MultiClusterTairManager mtairManager;
@Override public DistributeLock getLock(String lockName) { return new MdbDistributeLock(mtairManager, namespace, lockName); } }
|
分布式锁的一般使用方式如下:
- 初始化分布式锁的工厂
- 利用工厂生成一个分布式锁实例
- 使用该分布式实例上锁和解锁操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Test public void testTryLock() {
MdbDistributeLockFactory mdbDistributeLockFactory = new MdbDistributeLockFactory(); mdbDistributeLockFactory.setNamespace(603); mdbDistributeLockFactory.setMtairManager(new MultiClusterTairManager());
DistributeLock lock = mdbDistributeLockFactory.getLock("TestLock");
boolean locked = lock.tryLock(); if (!locked) { return; } try { } finally { lock.unlock(); } }
|
这样实现简单,但是也存在问题:释放锁的时候只是简单的将缓存中的key失效,所以存在错误释放他人已持有锁问题。

设想一种情况,当占有锁的Client 1在释放锁之前,锁就已经到期了,Client 2将获取锁,此时锁被Client 2持有,但是Client 1可能会错误的将其释放。只要锁的租期设置的足够长,该问题出现几率就足够小。
一个更优秀的方案,我们给每个锁都设置一个身份标识,在释放锁的时候,1)首先查询锁是否是自己的,2)如果是自己的则释放锁。
受限于实现方式,步骤1和步骤2不是原子操作,在步骤1和步骤2之间,如果锁到期被其他客户端获取,此时也会错误的释放他人的锁。
方案二
借助Redis的Lua脚本,可以完美的解决存在错误释放他人已持有锁问题的。
当我们想要获取锁时,我们可以执行如下方法:
1
| SET resource_name my_random_value NX PX 30000
|
当我们想要释放锁时,我们可以执行如下的Lua脚本
1 2 3 4 5
| if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
|
方案三
在方案一和方案二的讨论过程中,有一个问题被我们反复提及:锁的自动释放。
他的好处以及坏处如下:
1)一方面它很好的解决了持有锁的客户端单点故障的问题
2)另一方面,如果锁提前释放,就会出现锁的错误持有状态
这个时候,我们可以引入Watch Dog自动续租机制,参考以下Redisson是如何实现的。
在上锁成功后,Redisson会调用renewExpiration()方法开启一个Watch Dog线程,为锁自动续期。每过1/3时间续一次,成功则继续下一次续期,失败取消续期操作。
以下是Redisson的续期操作,Redisson也是使用Lua脚本进行锁续租的:1)判断锁是否存在;2)如果存在则重置过期时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; }
Timeout task = commandExecutor.getConnectionManager().newTimeout(timeout -> { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; }
RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; }
if (res) { renewExpiration(); } else { cancelExpirationRenewal(null); } }); }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task); } protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
|
方案四
借助Redisson的自动续期机制,我们无需再担心锁的自动释放。但是讨论到这里,我还是不得不面对一个问题:分布式锁本身不是一个分布式应用。当Redis服务器故障无法正常工作时,整个分布式锁也就无法提供服务。
参考
《阿里博客》