最近在写项目的时候需要实现一个事件分发器,事件分发器的职责是订阅和发布消息。因此在这个模式中,订阅者需要订阅自己感兴趣的事件,当事件生成的时候需要将事件发布给订阅者。这种模式的好处在于,订阅者和发布者的低耦合性,发布者不关心订阅者的具体行为,只关心订阅者所订阅的事件。订阅者也不关心发布者如何处理事件,只需要拿到自己想要的事件即可。
事件分发器主要有三个部分组成:
- 订阅者:订阅事件
- 发布者:发布事件
- 事件:数据的载体
设计
需要有一个数据结构存储所有的订阅者的事件类型,该数据类型中会存储事件类型和订阅者的键值对。
1
| private Map<EventTypeEnum, LinkedHashMap<Integer, WeakReference<Object>>> events;
|
当事件发布之后,会根据上面的数据结构将数据分发给各个订阅者。所以还需要一个数据结构存储各种实时产生的事件,由于系统的事件中,对事件顺序要求极高,因此存储的数据结构应该是一个队列。
1
| private Queue<Event> eventQueue;
|
事件是由事件源将数据入队,所以需要一个方法将事件放入事件队列中。
1 2 3
| public boolean putEvent(Event event) { return eventQueue.offer(event); }
|
有了以上的数据结构和方法时候,那么需要订阅者的订阅功能和发布者的发布功能。实际上在这里,事件分发器同时处理了订阅者的订阅,以及将事件发布给订阅者。
订阅功能
:将订阅者订阅的事件作为键,以及将订阅者的HashCode和实际对象作为值,存储在事件Map容器中。同时也支持取消事件订阅。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void subscribe(EventTypeEnum eventType, Object subscriber) { if (!events.containsKey(eventType)) { events.put(eventType, new LinkedHashMap<>()); } events.get(eventType).put(subscriber.hashCode(), new WeakReference<>(subscriber)); }
public boolean unsubscribe(EventTypeEnum eventType, Object subscriber) { if (events.containsKey(eventType)) { events.get(eventType).remove(subscriber.hashCode()); return true; } return false; }
|
发布功能
:注意到,在事件订阅时,我设计了一个注解,订阅者需要订阅的方法上,需要使用该注解,并说明需要订阅的事件类型。运行时使用,并用在对象方法上。发布事件时,事件分发器会通过反射调用使用该注解的方法,达到发布的效果。
1 2 3 4 5
| @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface OnEvent { EventTypeEnum eventType() default EventTypeEnum.EVENT_NONE; }
|
从事件队列中一一取出事件,通过订阅者订阅方法上的注解及其参数,将匹配的事件发布给订阅者。事件发布事件后,各种订阅者处理时间不太一样,有可能某些订阅的处理是阻塞的。因此我在发布事件的时,设计了异步处理事件对象。对于事件的发布,会有一个线程专门处理事件队列,不断发布事件,并将事件出队,避免事件重复发布。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| private void publish(Event event) { if (event != null && events.containsKey(event.getType())) { for (Map.Entry<Integer, WeakReference<Object>> subs : events.get(event.getType()).entrySet()) { WeakReference<Object> subscriberRef = subs.getValue(); Object subscriberObj = subscriberRef.get(); assert subscriberObj != null; for (final Method method : subscriberObj.getClass().getDeclaredMethods()) { OnEvent annotation = method.getAnnotation(OnEvent.class); if (annotation != null && annotation.eventType().equals(event.getType())) {
CompletableFuture.runAsync(() -> deliverEvent(subscriberObj, method, event)); } } } } } private <T> boolean deliverEvent(T subscriber, Method method, Event event) { try { boolean methodFound = false; for (final Class paramClass : method.getParameterTypes()) { if (paramClass.equals(event.getClass())) { methodFound = true; break; } } if (methodFound) { method.setAccessible(true); method.invoke(subscriber, event); }
return true; } catch (Exception e) { logger.error("Deliver message has failed", e); } return false; }
|
对于事件分发器,还有开始运行和结束功能,以及时间事件的发布和订阅,具体可以参考示例代码。
代码及示例
如上图所示,事件分发器在运行之后,会将时间事件自动分发给subscriber。
示例代码:https://github.com/SouthernYard/EventDispatcher