假如我们要实现一下功能:
如果单线程处理,那么执行图如下:
如果想要优化,则可以将获取order的操作并行执行,如下图:
大致的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| while (存在未对账订单) { Thread T1 = new Thread(()->{ pos = getPOrders(); }); T1.start(); Thread T2 = new Thread(()->{ dos = getDOrders(); }); T2.start(); T1.join(); T2.join(); diff = check(pos, dos); save(diff); }
|
但是这样写有一个缺点,就是while循环每次都要创建线程,而创建线程开销又比较的大,所以我们可以用线程池有优化。
用 CountDownLatch 实现线程等待
因为没有手动创建线程,所以没有join方法,那么如何通知主线程两个子线程已经执行完成了呢?我们可以使用CountDownLatch
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
Executor executor = Executors.newFixedThreadPool(2); while (存在未对账订单) { CountDownLatch latch = new CountDownLatch(2); executor.execute(()-> { pos = getPOrders(); latch.countDown(); }); executor.execute(()-> { dos = getDOrders(); latch.countDown(); });
latch.await(); diff = check(pos, dos); save(diff); }
|
进一步优化
在查询完第一次后进行对比时,可以进行第二次查询,如下图:
这里有点生产者-消费者的意思了,查询 可以当作生产者,查询玩的计算可以当作消费者。既然是生产者 - 消费者模型,那就需要有个队列,来保存生产者生产的数据,而消费者则从这个队列消费数据。
我们可以用两个队列,一个存储查询订单,一个存储派送订单,然后两个线程,当每个线程都查询一条数据时,通知第三个线程去执行计算。但是这样需要保证查询线程速度一样。
用 CyclicBarrier 实现线程同步
线程 T1 负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;线程 T2 负责查询派送单,当查出一条时,也调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;当 T1 和 T2 都调用 barrier.await() 的时候,计数器会减到 0,此时 T1 和 T2 就可以执行下一条语句了,同时会调用 barrier 的回调函数来执行对账操作。
CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| Vector<P> pos;
Vector<D> dos;
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{ executor.execute(()->check()); });
void check(){ P p = pos.remove(0); D d = dos.remove(0); diff = check(p, d); save(diff); }
void checkAll(){ Thread T1 = new Thread(()->{ while(存在未对账订单){ pos.add(getPOrders()); barrier.await(); } }); T1.start(); Thread T2 = new Thread(()->{ while(存在未对账订单){ dos.add(getDOrders()); barrier.await(); } }); T2.start(); }
|
CountDownLatch 主要用来解决一个线程等待多个线程的场景
CyclicBarrier 是一组线程之间互相等待
参考
《Java并发编程实战》