业务场景

背景:Excel的导入导出。由于数据量大,为了优化导出时的分页查询,插入数据时采用的是手动维护的自增id。

具体场景:如果有多个专家同时导入题库,如何保证手动维护自增id的不重复。

分析:由于同时多个专家同时导入的情况并不多见,为了不影响绝大多数的场景的性能,这里准备使用乐观锁来解决并发。

具体代码

使用EasyExcel,具体策略是单线程读取,然后使用线程池处理数据插入。核心的LockCityDataListener代码如下:

注意,这一版代码是存在问题的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.example.springboot_vue.utils.excel_util;

import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import com.alibaba.excel.util.ListUtils;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.example.springboot_vue.mapper.CityMapper;
import com.example.springboot_vue.pojo.city.City;
import com.example.springboot_vue.rabbitmq_test.RabbitMQProvider;
import lombok.SneakyThrows;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.defaults.DefaultSqlSessionFactory;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockCityDataListener implements ReadListener<City> {

CityMapper cityMapper;

// 用于标记当前申请的mark_id是否用完
private int index = 0;

// 手动开启事务
DataSourceTransactionManager dataSourceTransactionManager;

// 当前申请到的起始id
int currentStartNumber = 0;

/**
* 这里是设置批量插入数据的大小
*/
private static final int BATCH_COUNT = 1000;

/**
* 缓存的数据
*/
private List<City> cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT);

/**
* 创建一个线程池
*/
ExecutorService executorService = new ThreadPoolExecutor(20, 40, 10, TimeUnit.MINUTES, new LinkedBlockingDeque<>());

// 存储所有错误数据,暂时没用
List<List<City>> wrongList = new ArrayList<>();

// 用于使主线程等待所有异步线程结束
List<CompletableFuture<Integer>> allFutures = new ArrayList<>();

public LockCityDataListener(CityMapper cityMapper, DataSourceTransactionManager dataSourceTransactionManager) {
this.cityMapper = cityMapper;
this.dataSourceTransactionManager = dataSourceTransactionManager;
}

@SneakyThrows
@Override
public void invoke(City city, AnalysisContext analysisContext) {
if (index <= 0) {
// 这里是手动开启事务
DefaultTransactionDefinition df = new DefaultTransactionDefinition();
df.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus transaction = dataSourceTransactionManager.getTransaction(df);
// 注意
Map<String, Object> map = cityMapper.getTotalData();
int currentData = (int) map.get("column_count");
// 获取总行数
int rowNumber = analysisContext.readSheetHolder().getApproximateTotalRowNumber() - 1;
int resultNumber = currentData + rowNumber;
int res = cityMapper.setData((Integer) map.get("version"), resultNumber);
// 如果更新失败,就继续尝试更新
while (res == 0) {
// 注意
map = cityMapper.getTotalData();
resultNumber = rowNumber + (int) map.get("column_count");
// 注意
res = cityMapper.setData((Integer) map.get("version"), resultNumber);
}
index = rowNumber;
currentStartNumber = (int) map.get("column_count");

// 手动提交事务
dataSourceTransactionManager.commit(transaction);
}
city.setMarkId(++currentStartNumber);
cachedDataList.add(city);
index--;
// 一次1000条,如果超过1000条,就清除之前的内容
if (cachedDataList.size() >= BATCH_COUNT) {
List<City> tempList = new ArrayList<>(cachedDataList);
// 存储完成清理 list
cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> saveData(tempList), executorService)
.exceptionally(ex -> {
wrongList.add(tempList);
return null;
});
allFutures.add(future);
}
}

/**
* 这里是处理遗留的数据
* 注意:这里是每一个sheet读取完成后都会触发一次
*/
@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
if (CollectionUtils.isNotEmpty(cachedDataList)) {
saveData(cachedDataList);
// 这里如果不进行重置,如果每个sheet不是1000条数据,会存在多插入的情况
cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT);
index = 0;
}
CompletableFuture<Void> allCompleted = CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0]));
allCompleted.join();
}

public int saveData(List<City> cachedDataList) {
return cityMapper.insertCityAll(cachedDataList);
}
}

SQL如下:

1
2
3
4
5
6
7
8
9
@Select("select column_count, version from version_lock")
Map<String, Object> getTotalData();

<insert id="insertCityAll" parameterType="java.util.List">
insert into city(`name`, parent_name, `number`, column_1, column_2, mark_id) values
<foreach collection="list" separator="," item="item">
(#{item.name}, #{item.parentName}, #{item.number}, #{item.test}, #{item.test2}, #{item.markId})
</foreach>
</insert>

问题描述

当有两个线程同时进行插入时,但凡出现冲突,就会导致其中一个线程无限循环,另一个线程等待锁超时。

具体原因

问题出现在一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 查询数据
Map<String, Object> map = cityMapper.getTotalData();
// 省略几行代码...
// 修改数据
int res = cityMapper.setData((Integer) map.get("version"), resultNumber);
// 如果更新失败,就继续尝试更新
while (res == 0) {
// 注意
map = cityMapper.getTotalData();
resultNumber = rowNumber + (int) map.get("column_count");
// 注意
res = cityMapper.setData((Integer) map.get("version"), resultNumber);
}

想一下,如果res为0,意味着两个线程同时查询数据,然后进行了更新,但是只有一个线程可以更新成功。另一个线程由于版本号不对进如while循环。

关键点来了,就是这个while循环,在进入时,当前事务并未结束(假设事务A),虽然另外一个线程提交了它的事务(假设事务B),但由于默认的隔离级别是可重复读,这意味着其他事务(事务B)所做的修改对于当前事务(事务A)是不可见的。也就意味着,哪怕事务A进入了循环,重新执行了查询,但是它查询到的数据和while循环之外查询到的是一致的。而Update进行修改时,它所做的是当前读,也就是说它可以读取到最新的数据。所以修改永远无法成功。

这样就出现了上面的场景,一个线程无限循环,而另一个线程由于无限循环的事务没有提交,一直处于等待状态,最终超时断开连接。