假如我们要实现一下功能:

image-20230514190414839

如果单线程处理,那么执行图如下:

image-20230514190537547

如果想要优化,则可以将获取order的操作并行执行,如下图:

image-20230514190619676

大致的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
while (存在未对账订单) {
// 查询未对账订单
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待T1、T2结束
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

但是这样写有一个缺点,就是while循环每次都要创建线程,而创建线程开销又比较的大,所以我们可以用线程池有优化。

用 CountDownLatch 实现线程等待

因为没有手动创建线程,所以没有join方法,那么如何通知主线程两个子线程已经执行完成了呢?我们可以使用CountDownLatch

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while (存在未对账订单) {
// 计数器初始化为2
CountDownLatch latch = new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});

// 等待两个查询操作结束
latch.await();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

进一步优化

在查询完第一次后进行对比时,可以进行第二次查询,如下图:

image-20230514191455335

这里有点生产者-消费者的意思了,查询 可以当作生产者,查询玩的计算可以当作消费者。既然是生产者 - 消费者模型,那就需要有个队列,来保存生产者生产的数据,而消费者则从这个队列消费数据。

我们可以用两个队列,一个存储查询订单,一个存储派送订单,然后两个线程,当每个线程都查询一条数据时,通知第三个线程去执行计算。但是这样需要保证查询线程速度一样。

用 CyclicBarrier 实现线程同步

线程 T1 负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;线程 T2 负责查询派送单,当查出一条时,也调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;当 T1 和 T2 都调用 barrier.await() 的时候,计数器会减到 0,此时 T1 和 T2 就可以执行下一条语句了,同时会调用 barrier 的回调函数来执行对账操作。

CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池, 这里好像必须为1
Executor executor = Executors.newFixedThreadPool(1);
// 计数器的值为2
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
executor.execute(()->check());
});

// 判断时,将队列中的元素移除
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}

void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}

CountDownLatch 主要用来解决一个线程等待多个线程的场景

CyclicBarrier 是一组线程之间互相等待

参考

《Java并发编程实战》