业务场景
背景:Excel的导入导出。由于数据量大,为了优化导出时的分页查询,插入数据时采用的是手动维护的自增id。
具体场景:如果有多个专家同时导入题库,如何保证手动维护自增id的不重复。
分析:由于同时多个专家同时导入的情况并不多见,为了不影响绝大多数的场景的性能,这里准备使用乐观锁来解决并发。
具体代码
使用EasyExcel,具体策略是单线程读取,然后使用线程池处理数据插入。核心的LockCityDataListener
代码如下:
注意,这一版代码是存在问题的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
| package com.example.springboot_vue.utils.excel_util;
import com.alibaba.excel.context.AnalysisContext; import com.alibaba.excel.read.listener.ReadListener; import com.alibaba.excel.util.ListUtils; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.example.springboot_vue.mapper.CityMapper; import com.example.springboot_vue.pojo.city.City; import com.example.springboot_vue.rabbitmq_test.RabbitMQProvider; import lombok.SneakyThrows; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.session.defaults.DefaultSqlSessionFactory; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
public class LockCityDataListener implements ReadListener<City> {
CityMapper cityMapper;
private int index = 0; DataSourceTransactionManager dataSourceTransactionManager; int currentStartNumber = 0;
private static final int BATCH_COUNT = 1000;
private List<City> cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT);
ExecutorService executorService = new ThreadPoolExecutor(20, 40, 10, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
List<List<City>> wrongList = new ArrayList<>(); List<CompletableFuture<Integer>> allFutures = new ArrayList<>();
public LockCityDataListener(CityMapper cityMapper, DataSourceTransactionManager dataSourceTransactionManager) { this.cityMapper = cityMapper; this.dataSourceTransactionManager = dataSourceTransactionManager; }
@SneakyThrows @Override public void invoke(City city, AnalysisContext analysisContext) { if (index <= 0) { DefaultTransactionDefinition df = new DefaultTransactionDefinition(); df.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus transaction = dataSourceTransactionManager.getTransaction(df); Map<String, Object> map = cityMapper.getTotalData(); int currentData = (int) map.get("column_count"); int rowNumber = analysisContext.readSheetHolder().getApproximateTotalRowNumber() - 1; int resultNumber = currentData + rowNumber; int res = cityMapper.setData((Integer) map.get("version"), resultNumber); while (res == 0) { map = cityMapper.getTotalData(); resultNumber = rowNumber + (int) map.get("column_count"); res = cityMapper.setData((Integer) map.get("version"), resultNumber); } index = rowNumber; currentStartNumber = (int) map.get("column_count"); dataSourceTransactionManager.commit(transaction); } city.setMarkId(++currentStartNumber); cachedDataList.add(city); index--; if (cachedDataList.size() >= BATCH_COUNT) { List<City> tempList = new ArrayList<>(cachedDataList); cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> saveData(tempList), executorService) .exceptionally(ex -> { wrongList.add(tempList); return null; }); allFutures.add(future); } }
@Override public void doAfterAllAnalysed(AnalysisContext analysisContext) { if (CollectionUtils.isNotEmpty(cachedDataList)) { saveData(cachedDataList); cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT); index = 0; } CompletableFuture<Void> allCompleted = CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0])); allCompleted.join(); }
public int saveData(List<City> cachedDataList) { return cityMapper.insertCityAll(cachedDataList); } }
|
SQL如下:
1 2 3 4 5 6 7 8 9
| @Select("select column_count, version from version_lock") Map<String, Object> getTotalData();
<insert id="insertCityAll" parameterType="java.util.List"> insert into city(`name`, parent_name, `number`, column_1, column_2, mark_id) values <foreach collection="list" separator="," item="item"> (#{item.name}, #{item.parentName}, #{item.number}, #{item.test}, #{item.test2}, #{item.markId}) </foreach> </insert>
|
问题描述
当有两个线程同时进行插入时,但凡出现冲突,就会导致其中一个线程无限循环,另一个线程等待锁超时。
具体原因
问题出现在一下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13
| Map<String, Object> map = cityMapper.getTotalData();
int res = cityMapper.setData((Integer) map.get("version"), resultNumber);
while (res == 0) { map = cityMapper.getTotalData(); resultNumber = rowNumber + (int) map.get("column_count"); res = cityMapper.setData((Integer) map.get("version"), resultNumber); }
|
想一下,如果res为0,意味着两个线程同时查询数据,然后进行了更新,但是只有一个线程可以更新成功。另一个线程由于版本号不对进如while循环。
关键点来了,就是这个while循环,在进入时,当前事务并未结束(假设事务A),虽然另外一个线程提交了它的事务(假设事务B),但由于默认的隔离级别是可重复读,这意味着其他事务(事务B)所做的修改对于当前事务(事务A)是不可见的。也就意味着,哪怕事务A进入了循环,重新执行了查询,但是它查询到的数据和while循环之外查询到的是一致的。而Update进行修改时,它所做的是当前读,也就是说它可以读取到最新的数据。所以修改永远无法成功。
这样就出现了上面的场景,一个线程无限循环,而另一个线程由于无限循环的事务没有提交,一直处于等待状态,最终超时断开连接。