
java EventBus 使用总结
EventBus是一个典型的事件分发器,Observer模式。订阅者通过register集中到EventBus类中,当发布者通过post MessageEvent时,通知到订阅者。适用于一对多,当一个消息需要被多次不同处理时使用。使用 EventBus 进行注册订阅者,发送事件,可以实现所有订阅者都收到了发送的事件,就是一个 发布/订阅 模式。EventBus 还有个实现类:AsyncEventB
介绍
EventBus是一个典型的事件分发器,Observer模式。订阅者通过register集中到EventBus类中,当发布者通过post MessageEvent时,通知到订阅者。适用于一对多,当一个消息需要被多次不同处理时使用。
使用方式
简单实用直接上代码
/**
* 创建一个自定义的事件
*/
public class EventOne {
private final String url;
public EventOne(String url) {
this.url = url;
}
public String getUrl(){
return this.url;
}
}
//创建两个订阅者,用来接收我们发送的自定义事件
/**
* 订阅者1
*/
public class ListenerOne {
@Subscribe
public void consume(EventOne event){
System.out.println("ListenerOne"+event.getUrl());
}
}
/**
* 订阅者2
*/
public class ListenerTwo {
@Subscribe
public void consume(EventOne event){
System.out.println("ListenerTwo"+event.getUrl());
}
}
//使用eventBus来把订阅者注册到EventBus类里面去,然后发送事件
public class GoogleEventBus {
public static void main(String[] args) {
EventBus eventBus = new EventBus(new SubscriberExceptionHandler(){
//这里是对订阅者抛出的异常做的自定义处理
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
System.out.println("处理异常,异常信息:"+ exception.getMessage());
}
});
eventBus.register(new ListenerOne());
eventBus.register(new ListenerTwo());
eventBus.post(new EventOne("fjlajflkaj"));
}
}
//结果
ListenerOnefjlajflkaj
ListenerTwofjlajflkaj
总结扩展
使用 EventBus 进行注册订阅者,发送事件,可以实现所有订阅者都收到了发送的事件,就是一个 发布/订阅 模式。
EventBus 还有个实现类:AsyncEventBus
两者的区别在于,前者是同步调用订阅者的消费方法(会阻塞主线程),后者则是异步调用(不会阻塞主线程),下面我们通过源码分析一下它的实现原理
bus的结构
我们先看一下 EventBus和AsyncEventBus
EventBus:
(源码中代码多了显得太乱,这里只展示重要的部分)
executor:用来调用订阅者消费方法的线程池
exceptionHandler:自定义的异常处理
subscribers:用来存放订阅者信息
dispatcher:一个固定的对象(PerThreadQueuedDispatcher),EventBus提供的
我们new EventBus() 的时候 会给属性赋值
executor = MoreExecutors.directExecutor()
这里请记住他的execute方法
subscribers = new SubscriberRegistry(this);
就是创建了一个SubscriberRegistry对象,把自己放进了他的属性bus里了
dispatcher = Dispatcher.perThreadDispatchQueue()
创建了一个PerThreadQueuedDispatcher的对象
AsyncEventBus:
它继承了EventBus,和EventBus的区别有两点
1:executor是自定义的,new 的时候需要传进去
2:dispatcher不同,EventBus使用PerThreadQueuedDispatcher,AsyncEventBus使用LegacyAsyncDispatcher
看懂了bus的结构,那我们来看一下他是怎么把订阅者register到属性subscribers里去的
注册订阅者
我们先看第一步
这一步的意思是找到要注册的这个类里面所有带有@Subscribe注解的方法,然后获取方法的第一个参数,把方法的一些信息放进一个对象里(Subscriber)
比如这个类,他有两个@Subscribe注解的方法,被解析之后就变成了这样
methodsInListener = {EventOne.class:[Subscriber1,Subscriber2]}
事件对应方法的信息
那第一步之后的逻辑就好理解了
拿到解析好的数据之后,循环出来放进subscribers里,相同的事件类型放到同一个list里
最终存放在 subscribers 里的数据就是这个样子:
{
"Event1.class":[
"Subscriber1",
"Subscriber2",
"Subscriber3"
],
"Event2.class":[
"Subscriber1",
"Subscriber2",
"Subscriber3"
]
}
Event.class:事件类对象
Subscriber:订阅者消费方法信息
发布事件
post方法非常简单,先从subscribers里根据事件类型拿到所有的订阅者消费方法的信息,然后交给 dispatcher 的 dispatch 方法执行,那最主要的就是 dispatcher 的 dispatch 方法了。
这里是否还记得 EventBus 和 AsyncEventBus 的 dispatcher 是两个不同的对象!(不记得的话往上扒拉扒拉)
我们先看 EventBus 里的 PerThreadQueuedDispatcher 的 dispatch()
这里先是判断一下非空,然后放到一个ThreadLocal的队列里,最终从队列里取出来调了 Subscriber 的 dispatchEvent 方法(至于为什么中间加了两个ThreadLocal的属性,我目前还没看出来是啥原因,可能是怕多线程之间出现问题?)
dispatchEvent 方法就是交给线程池去执行,反射调用目标方法
AsyncEventBus 里的 LegacyAsyncDispatcher 的 dispatch()
这就好理解了,先放队列,然后从队列取,同上一样交给线程池去处理,然后method.invoke();
这里会不会有老表问一个问题?不管是EventBus还是AsyncEventBus,到最后都是交给线程池处理了,这不都是异步的吗?
如果有老表有这个疑问,可以再扒拉扒拉他们两个executor的execute方法有啥不一样!
彩蛋
最后给学习的老表提一个问题哈,AsyncEventBus异步是说多个不同事件之间的执行方式是异步的,那多个相同事件之间是不是异步的呢,EventBus提供了一个注解 @AllowConcurrentEvents 是解决多个相同事件之间异步的,至于他是怎么实现的,老表们可以自己带着问题去扒拉扒拉源码瞅瞅
封装
在学会了EventBus之后,老表们会不会有个想法呢?
你看我们每次都得往bus里注册订阅者,有多少个就得注册多少个,那我们能不能写个自动注册的呢,然后我们之后用的时候就能直接post就行了
直接上代码吧
public interface EventPark {
void post(Object eventObject);
void register(MyEventListener listener);
void unregister(MyEventListener listener);
}
------------------------------------------------------------------------------------------------------------------------------------------------------------
public abstract class AbstractEventPark implements EventPark, ApplicationListener<ApplicationReadyEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEventPark.class);
protected abstract EventBus getEventBus();
protected abstract String getEventParkName();
@Override
public void register(MyEventListener listener) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Registering listeners to {}: {}", getEventParkName(), listener);
}
boolean noSubscribeMethod = true;
Class<?> clazz = AopUtils.getTargetClass(listener);
Method[] methods = clazz.getMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(Subscribe.class)) {
noSubscribeMethod = false;
}
}
if (noSubscribeMethod) {
throw new RuntimeException(clazz + " must have an @Subscribe method!");
}
EventBus eventBus = getEventBus();
eventBus.register(listener);
}
@Override
public void unregister(MyEventListener listener) {
getEventBus().unregister(listener);
}
@Override
public void post(Object eventObject) {
getEventBus().post(eventObject);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("EventBus post event: {}", eventObject);
}
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
ConfigurableApplicationContext applicationContext = event.getApplicationContext();
Map<String, MyEventListener> beansOfType = applicationContext.getBeansOfType(MyEventListener.class);
Set<Map.Entry<String, MyEventListener>> entries = beansOfType.entrySet();
for (Map.Entry<String, MyEventListener> entry : entries) {
this.register(entry.getValue());
}
}
}
------------------------------------------------------------------------------------------------------------------------------------------------------------
/**
* <p>异步的事件注册、提交中心</p>
*
* <p>异步队列,不会阻塞发送 {@link #post(Object)} 线程,生产的消息会先 {@code add} 到 {@code Dispatcher#ConcurrentLinkedQueue} 队列里,然后 {@code pool} 队列,使用线程池处理。
* <p>消费者执行时会占用线程池队列,声明 @AllowConcurrentEvents 表示这个监听器支持多线程处理,否则同一时间只会处理一条消息。
*
* <p>如果没有监听者将发送死信事件 DeadEvent
* @author jingyu.li
* @date 2023-05-24
*/
@Component
public final class AsyncEventPark extends AbstractEventPark {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncEventPark.class);
private final AsyncEventBus asyncEventBus;
private final ThreadPoolExecutor threadPoolExecutor;
// public AsyncEventPark() {
// this(Runtime.getRuntime().availableProcessors() * 2 + 1);
// }
public AsyncEventPark() {
this(2);
}
public AsyncEventPark(int coreSize) {
this(new ThreadPoolExecutor(coreSize, coreSize, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory("event-park")));
}
public AsyncEventPark(ThreadPoolExecutor executor) {
this.threadPoolExecutor = executor;
this.asyncEventBus = new AsyncEventBus("async-event-bus", executor);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("ThreadPoolExecutor current core pool size is {}", executor.getCorePoolSize());
}
}
@Override
protected EventBus getEventBus() {
return asyncEventBus;
}
@Override
protected String getEventParkName() {
return "AsyncEventPark";
}
public int getCorePoolSize() {
return this.threadPoolExecutor.getCorePoolSize();
}
public int getActiveCount() {
return this.threadPoolExecutor.getActiveCount();
}
public int getQueueSize() {
return this.threadPoolExecutor.getQueue().size();
}
}
------------------------------------------------------------------------------------------------------------------------------------------------------------
public interface MyEventListener<T extends Object> {
/**
* 消费消息事件
* @param event 事件对象
*/
void consume(T event);
}
订阅者只需要实现MyEventListener接口,并实现consume方法,再带上@Subscribe注解,就会被自动注册到bus里去
我们用的时候就在类中注入EventPark,调用EventPark.post()方法,就ok了
有兴趣的老表可以试一试,我这就不再解释了
对EventBus的介绍纯个人理解,有不对的地方欢迎评论区的老表讨论。
更多推荐
所有评论(0)