后端思维篇:如何抽取一个观察者模板

前言

1. 观察者模式定义

观察者模式,也可以称之为发布订阅模式,它在GoF 的《设计模式》中,是这么定义的:

Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically。

翻译过来就是:观察者模式定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被完成业务的更新。

观察者模式属于行为模式,一个对象(被观察者)的状态发生改变,所有的依赖对象(观察者对象)都将得到通知,进行广播通知。它的主要成员就是观察者和被观察者。

被观察者(Observerable):目标对象,状态发生变化时,将通知所有的观察者。

观察者(observer):接受被观察者的状态变化通知,执行预先定义的业务。

2. 观察者模式的应用场景

哪些场景我们可以考虑使用观察者模式呢?

我们日常生活中,其实就有观察者模式类似的例子。比如,我们订阅了报社一年的报纸。每天报社印刷好报纸,就送到我们手中。我们就是观察者,报社就是被观察者。

而日常开发中,观察者模式的使用场景主要表现在:完成一件事情后,通知处理某个逻辑。如,登陆成功发个IM消息,支付成功发个邮件消息或者发个抽奖消息,用户评论成功给他发个积分等等。

举个详细点的例子吧,登陆注册应该是最常见的业务场景了,我们就拿注册来说事,大家经常会遇到类似的场景,就是用户注册成功后,我们给用户发一条IM消息,又或者发个邮件等等,因此经常有如下的代码:

void register(User user){
insertRegisterUser(user);
sendIMMessage();
sendEmail();
}

这块代码会有什么问题呢?如果产品又加需求:现在注册成功的用户,再给用户发一条短信通知。于是你又得改register方法的代码了。这是不是违反了开闭原则啦。

void register(User user){
insertRegisterUser(user);
sendIMMessage();
sendMobileMessage();
sendEmail();
}

并且,如果调发短信的接口失败了,是不是又影响到用户注册了?!这时候,是不是得加个异步方法,异步发通知消息才好?其实这种场景,我们可以使用异步非阻塞的观察者模式优化的。

3. 如何实现一个简单的观察者模式

我们先来看下,简单的观察者模式如何实现。可以这么定义

  • 一个主题接口Subject(声明添加、删除、通知观察者方法)。
  • 一个Observer观察者接口。
  • 一个创建主题的类ObserverableImpl​(即被观察者),实现了Subject接口。
  • 各个观察者的差异化实现。

为了通俗易懂,可以这样理解观察者模式:就是被观察者(ObserverableImpl)做了一件事情,或者说发布了一个主题(Subject),然后这件事情通知到各个相关的不同的人(不同的观察者,Observer的差异化实现者)。

一个主题接口。

public interface Subject {

/**
* 添加观察者
* @param observer
*/
void addServer(Observer observer);

/**
* 移除观察者
* @param observer
*/
void removeServer(Observer observer);

/**
* 通知观察者
* @param msg
*/
void notifyAllObservers(String msg);

}

一个Observer接口。

/**
* 观察者
*
*/
public interface Observer {
/**
* 更新消息
* @param msg
*/
void update(String msg);
}

一个创建主题的类ObserverableImpl(即被观察者),同时有观察者列表的属性(其实就是说观察者要事先注册到被观察者)。

public class ObserverableImpl implements Subject {

/**
* 存储被观察者
*/
private List observers = new ArrayList();

@Override
public void addServer(Observer observer) {
observers.add(observer);
}

@Override
public void removeServer(Observer observer) {
observers.remove(observer);
}

@Override
public void notifyAllObservers(String msg) {
for (Observer observer : observers) {
observer.update(msg);
}
}
}

观察者的差异化实现,以及使用。

public class ObserverOneImpl implements Observer {
@Override
public void update(String msg) {
System.out.println("ObserverOne is notified,"+msg);
}
}

public class ObserverTwoImpl implements Observer {

@Override
public void update(String msg) {
System.out.println("ObserverTwo is notified,"+msg);
}
}

public class ObserverDemoTest {
public static void main(String[] args) {
Subject subject = new ObserverableImpl();
//添加观察者
subject.addObserver(new ObserverOneImpl());
subject.addObserver(new ObserverTwoImpl());
//通知
subject.notifyAllObservers("关注公众号:捡田螺的小男孩");
}
}
//输出
ObserverOne is notified,关注公众号:捡田螺的小男孩
ObserverTwo is notified,关注公众号:捡田螺的小男孩

就这样,我们实现了观察者模式啦,是不是很简单?不过上面的代码,只能算是观察者模式的模板代码,只能反映大体的设计思路。接下来,我们看下在工作中,是如何使用观察者模式的。

4. 工作中,如何使用观察者模式的

观察者模式的实现有两种方式,同步阻塞方式和异步非阻塞方式。第3小节就是一个同步阻塞方式的观察者模式。我们来看下,日常工作的例子:用户注册成功发消息的例子,如何实现。本小节分同步阻塞、异步阻塞、spring观察者模式三个方向探讨。

  • 同步阻塞方式的观察模式
  • 异步非阻塞方式的观察者模式
  • spring观察者模式应用

4.1 同步阻塞方式的观察模式

我们可以把用户注册,当做被观察者实现的逻辑,然后发消息就是观察者的实现逻辑。

假设有两个观察者,分  别是发QQ消息和手机消息,于是有以下代码:

public interface RegisterObserver {
void sendMsg(String msg);
}
@Service
public class ObserverMobileImpl implements RegisterObserver {
@Override
public void sendMsg(String msg) {
System.out.println("发送手机短信消息"+msg);
}
}
@Service
public class ObserverQQImpl implements RegisterObserver {
@Override
public void sendMsg(String msg) {
System.out.println("发送QQ消息"+msg);
}
}

直接可以通过spring的ApplicationContextAware,初始化观察者列表,然后用户注册成功,通知观察者即可。代码如下:

@RestController
public class UserController implements ApplicationContextAware{

@Autowired
private UserService userService;

//观察者列表
private Collection regObservers;

@RequestMapping("register")
public String register(UserParam userParam) {
//注册成功过(类似于被观察者,做了某件事)
userService.addUser(userParam);
//然后就开始通知各个观察者。
for(RegisterObserver temp:regObservers){
temp.sendMsg("注冊成功");
}
return "SUCCESS";
}

//利用spring的ApplicationContextAware,初始化所有观察者
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
regObservers = new ArrayList<>(applicationContext.getBeansOfType(RegisterObserver.class).values());
}
}

可以发现,观察者模式,就是将不同的行为代码解耦,也就是说将观察者和被观察者代码解耦。但是这里大家会发现,这是同步阻塞式的观察者模式,是有缺点的,比如发QQ消息异常,就会影响用户注册,或者发消息因为某些原因耗时,就影响了用户注册,所以可以考虑异步非阻塞的观察者模式。

4.2 异步非阻塞方式的观察者模式

如何实现异步非阻塞,最简单就是另开个线程嘛,即新开个线程或者线程池异步跑观察者通知。代码如下:

@RestController
public class UserController implements ApplicationContextAware{

@Autowired
private UserService userService;

private Collection regObservers;

private Executor executor = Executors.newFixedThreadPool(10);

@RequestMapping("register")
public String register(UserParam userParam) {
userService.addUser(userParam);
//异步通知每个观察者
for (RegisterObserver temp : regObservers) {
executor.execute(() -> {
temp.sendMsg("注冊成功");
});
}

return "SUCCESS";
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
regObservers = new ArrayList<>(applicationContext.getBeansOfType(RegisterObserver.class).values());
}
}

线程池实现的异步非阻塞方式,还是可以的,但是异步执行逻辑都耦合在了register()函数中,不是很优雅,也增加了这部分业务代码的维护成本。一般日常工作中,我们会用spring那一套观察者模式等。

4.3 spring观察者模式应用

spring的观察者模式使用也是比较简单的,就是先定义个事件,继承于ApplicationEvent:

public class MessageEvent extends ApplicationEvent {

public MessageEvent(Object source) {
super(source);
}
}

然后定义一个事件监听器MessageListener,类似于观察者,它实现ApplicationListener接口。

@Component
public class MessageListener implements ApplicationListener {
@Override
public void onApplicationEvent(MessageEvent messageEvent) {
System.out.println("用户注册成功,执行监听事件"+messageEvent.getSource());
}
}

用户注册成功后,applicationEventPublisher(类似于被观察者)发布事件即可,代码如下:

@RestController
public class UserController implements ApplicationContextAware{

@Autowired
private UserService userService;

@Autowired
private ApplicationEventPublisher applicationEventPublisher;

@RequestMapping("springListenRegister")
public String springListenRegister(UserParam userParam) {
System.out.println("开始注册");
userService.addUser(userParam);
//用户注册成功,发布事件
applicationEventPublisher.publishEvent(new MessageEvent("666"));
return "SUCCESS";
}

运行结果:

开始注册
用户注册成功,执行监听事件666

这个也是同步阻塞的方式实现的,等下下个小节先介绍完spring观察者模式的原理,田螺哥再来教大家如何抽取一个通用的异步非阻塞观察者模式哈。

5. Spring观察者模式原理

Spring 中实现的观察者模式包含三部分:分别是Event事件(相当于消息)、Listener监听者(相当于观察者)、Publisher发送者(相当于被观察者)。用个图表示就是这样:

这个ApplicationEvent是放到哪里的,监听者AppliactionListener是如何监听到的。接下来,我们来看下spring框架的观察者原理是怎样哈。

我们先来看下ApplicationEventPublisher源代码(被观察者/发布者)

@FunctionalInterface
public interface ApplicationEventPublisher {

default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}

void publishEvent(Object event);

}

ApplicationEventPublisher它只是一个函数式接口,我们再看下它接口方法的实现。它的具体实现类是AbstractApplicationContext,这个类代码有点多,我把关键部分代码贴出来了:

public abstract class AbstractApplicationContext extends ... {
//监听者(观察者列表)
private final Set> applicationListeners;

//构造器,初始化观察者列表
public AbstractApplicationContext() {
this.applicationListeners = new LinkedHashSet();
//...
}

//发布事件
public void publishEvent(ApplicationEvent event) {
this.publishEvent(event, (ResolvableType)null);
}

public void publishEvent(Object event) {
this.publishEvent(event, (ResolvableType)null);
}

//发布事件接口实现
protected void publishEvent(Object event, ResolvableType eventType) {
//...
Object applicationEvent;
if (event instanceof ApplicationEvent) {
//如果event是ApplicationEvent对象,或者是它的子类
applicationEvent = (ApplicationEvent)event;
} else {
// 如果不是ApplicationEvent对象或者它的子类,则将其包装成PayloadApplicationEvent事件,并获取对应的事件类型
applicationEvent = new PayloadApplicationEvent(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType();
}
}

if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
} else {
//真正的消息发送,是通过它。获取ApplicationEventMulticaster,调用multicastEvent方法广播事件
this.getApplicationEventMulticaster().multicastEvent(
(ApplicationEvent)applicationEvent, eventType);
}

//如果当前命名空间还有父亲节点,也需要给父亲推送该消息
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext)this.parent).publishEvent(event, eventType);
} else {
this.parent.publishEvent(event);
}
}
}

//添加观察者(监听者)
public void addApplicationListener(ApplicationListener listener) {
Assert.notNull(listener, "ApplicationListener must not be null");
if (this.applicationEventMulticaster != null) {
this.applicationEventMulticaster.addApplicationListener(listener);
} else {
this.applicationListeners.add(listener);
}
}

//观察者列表
public Collection> getApplicationListeners() {
return this.applicationListeners;
}

// 注册监听器
protected void registerListeners() {
//把提前存储好的监听器添加到监听器容器中到ApplicationEventMulticaster
for (ApplicationListener listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
//获取类型是ApplicationListener的beanName集合,此处不会去实例化bean
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}


Set earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
//如果存在earlyEventsToProcess,提前处理这些事件
if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
}

通过以上代码,我们可以发现,真正的消息发送,实际上是通过事件广播器ApplicationEventMulticaster 这个接口来完成的。multicastEvent是主要方法,这个方法的实现在类SimpleApplicationEventMulticaster中,我们一起来看下源码:

public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
...

//线程池
@Nullable
protected Executor getTaskExecutor() {
return this.taskExecutor;
}

public void setTaskExecutor(@Nullable Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}


@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 根据event类型获取适合的监听器
Executor executor = getTaskExecutor();
for (ApplicationListener listener : getApplicationListeners(event, type)) {
if (executor != null) {
//如果executor不为空,异步调用执行监听器中的方法
executor.execute(() -> invokeListener(listener, event));
}
else {
//调用监听器的方法
invokeListener(listener, event);
}
}
}

protected void invokeListener(ApplicationListener listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
//如果存在ErrorHandler,调用监听器方法(会用try...catch包一下)
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
//如果抛出异常则调用ErrorHandler来处理异常。
errorHandler.handleError(err);
}
}
else {
否则直接调用监听器方法
doInvokeListener(listener, event);
}
}
...
}

可以发现,默认情况下,spring实现的观察者模式,同步阻塞的。如果想异步执行事件,可以自定义SimpleApplicationEventMulticaster,然后构造一下executor线程池就好啦。代码如下:

/**
* 公众号:捡田螺的小男孩
*/
@Component
public class ListenerConfig {

//把线程池赋值进去
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(simpleAsyncTaskExecutor());
return simpleApplicationEventMulticaster;
}

@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
return new SimpleAsyncTaskExecutor();
}
}

demo跑一下,运行结果:

注册开始
当前线程名称http-nio-8080-exec-1
注册结束
用户注册成功2,执行监听事件666Sat Jun 18 11:44:07 GMT+08:00 2022
当前线程名称:SimpleAsyncTaskExecutor-20
当前线程名称:SimpleAsyncTaskExecutor-19
用户注册成功,执行监听事件666Sat Jun 18 11:44:12 GMT+08:00 2022

如果手动新建SimpleApplicationEventMulticaster,并设置taskExecutor的话,所有的监听响应事件,都是异步执行的哦。而有些有些场景我们希望同步执行的,这时候这种实现方式就不好了。

其实spring提供了@Async注解,可以用来实现异步。具体怎么实现呢?其实很简单,只需要在配置类加上@EnableAsync,接着在需要异步执行的监听实现方法。加上@Async即可。代码实现如下:

/**
* 关注公众号:捡田螺的小男孩
* 更多实战干货
*/
@Component
@EnableAsync //配置类加上```@EnableAsync```
public class ListenerConfig2 {

@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
return simpleApplicationEventMulticaster;
}

}

@Component
public class MessageAsyncListener3 implements ApplicationListener {

@Async //方法异步注解
@Override
public void onApplicationEvent(MessageEvent messageEvent) {
System.out.println("用户注册成功3,执行监听事件" + messageEvent.getSource() + new Date());
System.out.println("当前线程名称:"+Thread.currentThread().getName());
}
}

日常开发中,异步执行也可以自己手动通过线程池来开启啦。回到我们本文的后端思维主题,如果每个开发,都自己定义观察者模式的实现,这种代码会很混乱,所以最好是实现一个可扩展,通用的观察者模板。

6. 基于spring观察者模式,抽取一个模板

接下来的最后小节,跟大家一起基于spring的观察者模式,一步一步实现并抽取个模板哈。

我们要基于spring实现观察者模式的话,就包括这三步:

定义Event事件(相当于消息),一般定义一个Event对象,继承ApplicationEvent。

定义Listener监听者(相当于观察者),实现接口ApplicationListener。

Publisher发送者(相当于被观察者),通过ApplicationEventPublisher发布。

6.1 定义Event事件对象

既然我们要抽取观察者模板,那肯定不是每个人自己写自己的Event,然后都去继承ApplicationEvent。

我们可以自己定义一个项目相关的,通用的BaseEvent类,然后一些相关通用的信息属性可以放进去,比如eventId或者流水号bizSeq什么的,都可以,看你们项目需要哈。以下代码,我定义一个空空如也的BaseEvent。

/**
* 关注公众号:捡田螺的小男孩
* 更多实战干货
* @desc : 事件基础对象
*/
public class BaseEvent extends ApplicationEvent {

public BaseEvent(Object source) {
super(source);
}

public BaseEvent() {
this("");
}
}

如果你的观察者模式,是注册成功之后,发个消息的,你就可以声明一个消息类事件对象RegisterMessageEvent,继承通用的BaseEvent即可。然后属性可以自定义就好,比如messageId。

public class RegisterMessageEvent  extends BaseEvent{

private String msgId;

public RegisterMessageEvent(String msgId) {
super();
this.msgId = msgId;
}

public String getMsgId() {
return msgId; 网站题目:后端思维篇:如何抽取一个观察者模板
分享URL:http://www.shufengxianlan.com/qtweb/news2/342102.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联