观察者模式有几种不同的实现方式:同步阻塞、异步非阻塞、进程内、进程间的实现方式。

同步阻塞是最经典的实现方式,主要是为了代码解耦;异步非阻塞除了能实现代码解耦之 外,还能提高代码的执行效率;进程间的观察者模式解耦更加彻底,一般是基于消息队列来 实现,用来实现不同进程间的被观察者和观察者之间的交互。

异步非阻塞观察者模式的简易实现

有两种实现方式:其中一种是:在每个 handleRegSuccess() 函数中创建一个新的线 程执行代码逻辑;另一种是:在 UserController 的 register() 函数中使用线程池来执行每 个观察者的 handleRegSuccess() 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 第一种实现方式,其他类代码不变,就没有再重复罗列
public class RegPromotionObserver implements RegObserver {
private PromotionService promotionService; // 依赖注入
@Override
public void handleRegSuccess(long userId) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
promotionService.issueNewUserExperienceCash(userId);
}
});
thread.start();
}
}

// 第二种实现方式,其他类代码不变,就没有再重复罗列
public class UserController {
private UserService userService; // 依赖注入
private List<RegObserver> regObservers = new ArrayList<>();
private Executor executor;
public UserController(Executor executor) {
this.executor = executor;
}
public void setRegObservers(List<RegObserver> observers) {
regObservers.addAll(observers);
}
public Long register(String telephone, String password) {
//省略输入参数的校验代码
//省略userService.register()异常的try-catch代码
long userId = userService.register(telephone, password);
for (RegObserver observer : regObservers) {
executor.execute(new Runnable() {
@Override
public void run() {
observer.handleRegSuccess(userId);
}
});
}
return userId;
}
}

对于第一种实现方式,频繁地创建和销毁线程比较耗时,并且并发线程数无法控制,创建过多的线程会导致堆栈溢出。

第二种实现方式,尽管利用了线程池解决了第一种实现方式的问题,但线程池、异步执行逻辑都耦合在了 register() 函数中,增加了这部分业务代码的维护成本。

EventBus核心

image-20230619110451041

image-20230619110502970

从图中我们可以看出,最关键的一个数据结构是 Observer 注册表,记录了消息类型和可接收消息函数的对应关系。

当调用 register() 函数注册观察者的时候,EventBus 通过解析 @Subscribe 注解,生成 Observer 注册表。

当调用 post() 函数发送消息的时候, EventBus 通过注册表找到相应的可接收消息的函数,然后通过 Java 的反射语法来动态地 创建对象、执行函数。

对于同步阻塞模式,EventBus 在一个线程内依次执行相应的函数。

对于异步非阻塞模式,EventBus 通过一个线程池来执行相应的函数。

参考

《设计模式之美》