最近在写项目的时候需要实现一个事件分发器,事件分发器的职责是订阅和发布消息。因此在这个模式中,订阅者需要订阅自己感兴趣的事件,当事件生成的时候需要将事件发布给订阅者。这种模式的好处在于,订阅者和发布者的低耦合性,发布者不关心订阅者的具体行为,只关心订阅者所订阅的事件。订阅者也不关心发布者如何处理事件,只需要拿到自己想要的事件即可。
事件分发器主要有三个部分组成:

  1. 订阅者:订阅事件
  2. 发布者:发布事件
  3. 事件:数据的载体

设计

需要有一个数据结构存储所有的订阅者的事件类型,该数据类型中会存储事件类型和订阅者的键值对。

1
private Map<EventTypeEnum, LinkedHashMap<Integer, WeakReference<Object>>> events;

当事件发布之后,会根据上面的数据结构将数据分发给各个订阅者。所以还需要一个数据结构存储各种实时产生的事件,由于系统的事件中,对事件顺序要求极高,因此存储的数据结构应该是一个队列。

1
private Queue<Event> eventQueue;

事件是由事件源将数据入队,所以需要一个方法将事件放入事件队列中。

1
2
3
public boolean putEvent(Event event) {
return eventQueue.offer(event);
}

有了以上的数据结构和方法时候,那么需要订阅者的订阅功能和发布者的发布功能。实际上在这里,事件分发器同时处理了订阅者的订阅,以及将事件发布给订阅者。
订阅功能:将订阅者订阅的事件作为键,以及将订阅者的HashCode和实际对象作为值,存储在事件Map容器中。同时也支持取消事件订阅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void subscribe(EventTypeEnum eventType, Object subscriber) {
if (!events.containsKey(eventType)) {
events.put(eventType, new LinkedHashMap<>());
}
// Add
events.get(eventType).put(subscriber.hashCode(), new WeakReference<>(subscriber));
}

public boolean unsubscribe(EventTypeEnum eventType, Object subscriber) {
if (events.containsKey(eventType)) {
events.get(eventType).remove(subscriber.hashCode());
return true;
}
return false;
}

发布功能:注意到,在事件订阅时,我设计了一个注解,订阅者需要订阅的方法上,需要使用该注解,并说明需要订阅的事件类型。运行时使用,并用在对象方法上。发布事件时,事件分发器会通过反射调用使用该注解的方法,达到发布的效果。

1
2
3
4
5
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface OnEvent {
EventTypeEnum eventType() default EventTypeEnum.EVENT_NONE;
}

从事件队列中一一取出事件,通过订阅者订阅方法上的注解及其参数,将匹配的事件发布给订阅者。事件发布事件后,各种订阅者处理时间不太一样,有可能某些订阅的处理是阻塞的。因此我在发布事件的时,设计了异步处理事件对象。对于事件的发布,会有一个线程专门处理事件队列,不断发布事件,并将事件出队,避免事件重复发布。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  private void publish(Event event) {
if (event != null && events.containsKey(event.getType())) {
for (Map.Entry<Integer, WeakReference<Object>> subs : events.get(event.getType()).entrySet()) {
WeakReference<Object> subscriberRef = subs.getValue();
Object subscriberObj = subscriberRef.get();
assert subscriberObj != null;
for (final Method method : subscriberObj.getClass().getDeclaredMethods()) {
OnEvent annotation = method.getAnnotation(OnEvent.class);
if (annotation != null && annotation.eventType().equals(event.getType())) {
// 异步处理事件
CompletableFuture.runAsync(() -> deliverEvent(subscriberObj, method, event));
}
}
}
}
}

private <T> boolean deliverEvent(T subscriber, Method method, Event event) {
try {
boolean methodFound = false;
for (final Class paramClass : method.getParameterTypes()) {
if (paramClass.equals(event.getClass())) {
methodFound = true;
break;
}
}
if (methodFound) {
method.setAccessible(true);
method.invoke(subscriber, event);
}

return true;
} catch (Exception e) {
logger.error("Deliver message has failed", e);
}
return false;
}

对于事件分发器,还有开始运行和结束功能,以及时间事件的发布和订阅,具体可以参考示例代码。

代码及示例

如上图所示,事件分发器在运行之后,会将时间事件自动分发给subscriber。
示例代码:https://github.com/SouthernYard/EventDispatcher