新聞中心
前言
1. 觀察者模式定義
觀察者模式,也可以稱之為發(fā)布訂閱模式,它在GoF 的《設(shè)計(jì)模式》中,是這么定義的:

Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically。
翻譯過來就是:觀察者模式定義對(duì)象間的一種一對(duì)多的依賴關(guān)系,當(dāng)一個(gè)對(duì)象的狀態(tài)發(fā)生改變時(shí),所有依賴于它的對(duì)象都得到通知并被完成業(yè)務(wù)的更新。
觀察者模式屬于行為模式,一個(gè)對(duì)象(被觀察者)的狀態(tài)發(fā)生改變,所有的依賴對(duì)象(觀察者對(duì)象)都將得到通知,進(jìn)行廣播通知。它的主要成員就是觀察者和被觀察者。
被觀察者(Observerable):目標(biāo)對(duì)象,狀態(tài)發(fā)生變化時(shí),將通知所有的觀察者。
觀察者(observer):接受被觀察者的狀態(tài)變化通知,執(zhí)行預(yù)先定義的業(yè)務(wù)。
2. 觀察者模式的應(yīng)用場(chǎng)景
哪些場(chǎng)景我們可以考慮使用觀察者模式呢?
我們?nèi)粘I钪?,其?shí)就有觀察者模式類似的例子。比如,我們訂閱了報(bào)社一年的報(bào)紙。每天報(bào)社印刷好報(bào)紙,就送到我們手中。我們就是觀察者,報(bào)社就是被觀察者。
而日常開發(fā)中,觀察者模式的使用場(chǎng)景主要表現(xiàn)在:完成一件事情后,通知處理某個(gè)邏輯。如,登陸成功發(fā)個(gè)IM消息,支付成功發(fā)個(gè)郵件消息或者發(fā)個(gè)抽獎(jiǎng)消息,用戶評(píng)論成功給他發(fā)個(gè)積分等等。
舉個(gè)詳細(xì)點(diǎn)的例子吧,登陸注冊(cè)應(yīng)該是最常見的業(yè)務(wù)場(chǎng)景了,我們就拿注冊(cè)來說事,大家經(jīng)常會(huì)遇到類似的場(chǎng)景,就是用戶注冊(cè)成功后,我們給用戶發(fā)一條IM消息,又或者發(fā)個(gè)郵件等等,因此經(jīng)常有如下的代碼:
void register(User user){
insertRegisterUser(user);
sendIMMessage();
sendEmail();
}這塊代碼會(huì)有什么問題呢?如果產(chǎn)品又加需求:現(xiàn)在注冊(cè)成功的用戶,再給用戶發(fā)一條短信通知。于是你又得改register方法的代碼了。這是不是違反了開閉原則啦。
void register(User user){
insertRegisterUser(user);
sendIMMessage();
sendMobileMessage();
sendEmail();
}并且,如果調(diào)發(fā)短信的接口失敗了,是不是又影響到用戶注冊(cè)了?!這時(shí)候,是不是得加個(gè)異步方法,異步發(fā)通知消息才好?其實(shí)這種場(chǎng)景,我們可以使用異步非阻塞的觀察者模式優(yōu)化的。
3. 如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單的觀察者模式
我們先來看下,簡(jiǎn)單的觀察者模式如何實(shí)現(xiàn)??梢赃@么定義
- 一個(gè)主題接口Subject(聲明添加、刪除、通知觀察者方法)。
- 一個(gè)Observer觀察者接口。
- 一個(gè)創(chuàng)建主題的類ObserverableImpl?(即被觀察者),實(shí)現(xiàn)了Subject接口。
- 各個(gè)觀察者的差異化實(shí)現(xiàn)。
為了通俗易懂,可以這樣理解觀察者模式:就是被觀察者(ObserverableImpl)做了一件事情,或者說發(fā)布了一個(gè)主題(Subject),然后這件事情通知到各個(gè)相關(guān)的不同的人(不同的觀察者,Observer的差異化實(shí)現(xiàn)者)。
一個(gè)主題接口。
public interface Subject {
/**
* 添加觀察者
* @param observer
*/
void addServer(Observer observer);
/**
* 移除觀察者
* @param observer
*/
void removeServer(Observer observer);
/**
* 通知觀察者
* @param msg
*/
void notifyAllObservers(String msg);
}一個(gè)Observer接口。
/**
* 觀察者
*
*/
public interface Observer {
/**
* 更新消息
* @param msg
*/
void update(String msg);
}
一個(gè)創(chuàng)建主題的類ObserverableImpl(即被觀察者),同時(shí)有觀察者列表的屬性(其實(shí)就是說觀察者要事先注冊(cè)到被觀察者)。
public class ObserverableImpl implements Subject {
/**
* 存儲(chǔ)被觀察者
*/
private List observers = new ArrayList();
@Override
public void addServer(Observer observer) {
observers.add(observer);
}
@Override
public void removeServer(Observer observer) {
observers.remove(observer);
}
@Override
public void notifyAllObservers(String msg) {
for (Observer observer : observers) {
observer.update(msg);
}
}
} 觀察者的差異化實(shí)現(xiàn),以及使用。
public class ObserverOneImpl implements Observer {
@Override
public void update(String msg) {
System.out.println("ObserverOne is notified,"+msg);
}
}
public class ObserverTwoImpl implements Observer {
@Override
public void update(String msg) {
System.out.println("ObserverTwo is notified,"+msg);
}
}
public class ObserverDemoTest {
public static void main(String[] args) {
Subject subject = new ObserverableImpl();
//添加觀察者
subject.addObserver(new ObserverOneImpl());
subject.addObserver(new ObserverTwoImpl());
//通知
subject.notifyAllObservers("關(guān)注公眾號(hào):撿田螺的小男孩");
}
}
//輸出
ObserverOne is notified,關(guān)注公眾號(hào):撿田螺的小男孩
ObserverTwo is notified,關(guān)注公眾號(hào):撿田螺的小男孩就這樣,我們實(shí)現(xiàn)了觀察者模式啦,是不是很簡(jiǎn)單?不過上面的代碼,只能算是觀察者模式的模板代碼,只能反映大體的設(shè)計(jì)思路。接下來,我們看下在工作中,是如何使用觀察者模式的。
4. 工作中,如何使用觀察者模式的
觀察者模式的實(shí)現(xiàn)有兩種方式,同步阻塞方式和異步非阻塞方式。第3小節(jié)就是一個(gè)同步阻塞方式的觀察者模式。我們來看下,日常工作的例子:用戶注冊(cè)成功發(fā)消息的例子,如何實(shí)現(xiàn)。本小節(jié)分同步阻塞、異步阻塞、spring觀察者模式三個(gè)方向探討。
- 同步阻塞方式的觀察模式
- 異步非阻塞方式的觀察者模式
- spring觀察者模式應(yīng)用
4.1 同步阻塞方式的觀察模式
我們可以把用戶注冊(cè),當(dāng)做被觀察者實(shí)現(xiàn)的邏輯,然后發(fā)消息就是觀察者的實(shí)現(xiàn)邏輯。
假設(shè)有兩個(gè)觀察者,分 別是發(fā)QQ消息和手機(jī)消息,于是有以下代碼:
public interface RegisterObserver {
void sendMsg(String msg);
}
@Service
public class ObserverMobileImpl implements RegisterObserver {
@Override
public void sendMsg(String msg) {
System.out.println("發(fā)送手機(jī)短信消息"+msg);
}
}
@Service
public class ObserverQQImpl implements RegisterObserver {
@Override
public void sendMsg(String msg) {
System.out.println("發(fā)送QQ消息"+msg);
}
}直接可以通過spring的ApplicationContextAware,初始化觀察者列表,然后用戶注冊(cè)成功,通知觀察者即可。代碼如下:
@RestController
public class UserController implements ApplicationContextAware{
@Autowired
private UserService userService;
//觀察者列表
private CollectionregObservers;
@RequestMapping("register")
public String register(UserParam userParam) {
//注冊(cè)成功過(類似于被觀察者,做了某件事)
userService.addUser(userParam);
//然后就開始通知各個(gè)觀察者。
for(RegisterObserver temp:regObservers){
temp.sendMsg("注冊(cè)成功");
}
return "SUCCESS";
}
//利用spring的ApplicationContextAware,初始化所有觀察者
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
regObservers = new ArrayList<>(applicationContext.getBeansOfType(RegisterObserver.class).values());
}
}
可以發(fā)現(xiàn),觀察者模式,就是將不同的行為代碼解耦,也就是說將觀察者和被觀察者代碼解耦。但是這里大家會(huì)發(fā)現(xiàn),這是同步阻塞式的觀察者模式,是有缺點(diǎn)的,比如發(fā)QQ消息異常,就會(huì)影響用戶注冊(cè),或者發(fā)消息因?yàn)槟承┰蚝臅r(shí),就影響了用戶注冊(cè),所以可以考慮異步非阻塞的觀察者模式。
4.2 異步非阻塞方式的觀察者模式
如何實(shí)現(xiàn)異步非阻塞,最簡(jiǎn)單就是另開個(gè)線程嘛,即新開個(gè)線程或者線程池異步跑觀察者通知。代碼如下:
@RestController
public class UserController implements ApplicationContextAware{
@Autowired
private UserService userService;
private CollectionregObservers;
private Executor executor = Executors.newFixedThreadPool(10);
@RequestMapping("register")
public String register(UserParam userParam) {
userService.addUser(userParam);
//異步通知每個(gè)觀察者
for (RegisterObserver temp : regObservers) {
executor.execute(() -> {
temp.sendMsg("注冊(cè)成功");
});
}
return "SUCCESS";
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
regObservers = new ArrayList<>(applicationContext.getBeansOfType(RegisterObserver.class).values());
}
}
線程池實(shí)現(xiàn)的異步非阻塞方式,還是可以的,但是異步執(zhí)行邏輯都耦合在了register()函數(shù)中,不是很優(yōu)雅,也增加了這部分業(yè)務(wù)代碼的維護(hù)成本。一般日常工作中,我們會(huì)用spring那一套觀察者模式等。
4.3 spring觀察者模式應(yīng)用
spring的觀察者模式使用也是比較簡(jiǎn)單的,就是先定義個(gè)事件,繼承于ApplicationEvent:
public class MessageEvent extends ApplicationEvent {
public MessageEvent(Object source) {
super(source);
}
}然后定義一個(gè)事件監(jiān)聽器MessageListener,類似于觀察者,它實(shí)現(xiàn)ApplicationListener接口。
@Component
public class MessageListener implements ApplicationListener{
@Override
public void onApplicationEvent(MessageEvent messageEvent) {
System.out.println("用戶注冊(cè)成功,執(zhí)行監(jiān)聽事件"+messageEvent.getSource());
}
}
用戶注冊(cè)成功后,applicationEventPublisher(類似于被觀察者)發(fā)布事件即可,代碼如下:
@RestController
public class UserController implements ApplicationContextAware{
@Autowired
private UserService userService;
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@RequestMapping("springListenRegister")
public String springListenRegister(UserParam userParam) {
System.out.println("開始注冊(cè)");
userService.addUser(userParam);
//用戶注冊(cè)成功,發(fā)布事件
applicationEventPublisher.publishEvent(new MessageEvent("666"));
return "SUCCESS";
}
運(yùn)行結(jié)果:
開始注冊(cè)
用戶注冊(cè)成功,執(zhí)行監(jiān)聽事件666
這個(gè)也是同步阻塞的方式實(shí)現(xiàn)的,等下下個(gè)小節(jié)先介紹完spring觀察者模式的原理,田螺哥再來教大家如何抽取一個(gè)通用的異步非阻塞觀察者模式哈。
5. Spring觀察者模式原理
Spring 中實(shí)現(xiàn)的觀察者模式包含三部分:分別是Event事件(相當(dāng)于消息)、Listener監(jiān)聽者(相當(dāng)于觀察者)、Publisher發(fā)送者(相當(dāng)于被觀察者)。用個(gè)圖表示就是這樣:
這個(gè)ApplicationEvent是放到哪里的,監(jiān)聽者AppliactionListener是如何監(jiān)聽到的。接下來,我們來看下spring框架的觀察者原理是怎樣哈。
我們先來看下ApplicationEventPublisher源代碼(被觀察者/發(fā)布者)
@FunctionalInterface
public interface ApplicationEventPublisher {
default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}
void publishEvent(Object event);
}
ApplicationEventPublisher它只是一個(gè)函數(shù)式接口,我們?cè)倏聪滤涌诜椒ǖ膶?shí)現(xiàn)。它的具體實(shí)現(xiàn)類是AbstractApplicationContext,這個(gè)類代碼有點(diǎn)多,我把關(guān)鍵部分代碼貼出來了:
public abstract class AbstractApplicationContext extends ... {
//監(jiān)聽者(觀察者列表)
private final Set> applicationListeners;
//構(gòu)造器,初始化觀察者列表
public AbstractApplicationContext() {
this.applicationListeners = new LinkedHashSet();
//...
}
//發(fā)布事件
public void publishEvent(ApplicationEvent event) {
this.publishEvent(event, (ResolvableType)null);
}
public void publishEvent(Object event) {
this.publishEvent(event, (ResolvableType)null);
}
//發(fā)布事件接口實(shí)現(xiàn)
protected void publishEvent(Object event, ResolvableType eventType) {
//...
Object applicationEvent;
if (event instanceof ApplicationEvent) {
//如果event是ApplicationEvent對(duì)象,或者是它的子類
applicationEvent = (ApplicationEvent)event;
} else {
// 如果不是ApplicationEvent對(duì)象或者它的子類,則將其包裝成PayloadApplicationEvent事件,并獲取對(duì)應(yīng)的事件類型
applicationEvent = new PayloadApplicationEvent(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType();
}
}
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
} else {
//真正的消息發(fā)送,是通過它。獲取ApplicationEventMulticaster,調(diào)用multicastEvent方法廣播事件
this.getApplicationEventMulticaster().multicastEvent(
(ApplicationEvent)applicationEvent, eventType);
}
//如果當(dāng)前命名空間還有父親節(jié)點(diǎn),也需要給父親推送該消息
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext)this.parent).publishEvent(event, eventType);
} else {
this.parent.publishEvent(event);
}
}
}
//添加觀察者(監(jiān)聽者)
public void addApplicationListener(ApplicationListener> listener) {
Assert.notNull(listener, "ApplicationListener must not be null");
if (this.applicationEventMulticaster != null) {
this.applicationEventMulticaster.addApplicationListener(listener);
} else {
this.applicationListeners.add(listener);
}
}
//觀察者列表
public Collection> getApplicationListeners() {
return this.applicationListeners;
}
// 注冊(cè)監(jiān)聽器
protected void registerListeners() {
//把提前存儲(chǔ)好的監(jiān)聽器添加到監(jiān)聽器容器中到ApplicationEventMulticaster
for (ApplicationListener> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
//獲取類型是ApplicationListener的beanName集合,此處不會(huì)去實(shí)例化bean
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
Set earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
//如果存在earlyEventsToProcess,提前處理這些事件
if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
} 通過以上代碼,我們可以發(fā)現(xiàn),真正的消息發(fā)送,實(shí)際上是通過事件廣播器ApplicationEventMulticaster 這個(gè)接口來完成的。multicastEvent是主要方法,這個(gè)方法的實(shí)現(xiàn)在類SimpleApplicationEventMulticaster中,我們一起來看下源碼:
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
...
//線程池
@Nullable
protected Executor getTaskExecutor() {
return this.taskExecutor;
}
public void setTaskExecutor(@Nullable Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 根據(jù)event類型獲取適合的監(jiān)聽器
Executor executor = getTaskExecutor();
for (ApplicationListener> listener : getApplicationListeners(event, type)) {
if (executor != null) {
//如果executor不為空,異步調(diào)用執(zhí)行監(jiān)聽器中的方法
executor.execute(() -> invokeListener(listener, event));
}
else {
//調(diào)用監(jiān)聽器的方法
invokeListener(listener, event);
}
}
}
protected void invokeListener(ApplicationListener> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
//如果存在ErrorHandler,調(diào)用監(jiān)聽器方法(會(huì)用try...catch包一下)
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
//如果拋出異常則調(diào)用ErrorHandler來處理異常。
errorHandler.handleError(err);
}
}
else {
否則直接調(diào)用監(jiān)聽器方法
doInvokeListener(listener, event);
}
}
...
}可以發(fā)現(xiàn),默認(rèn)情況下,spring實(shí)現(xiàn)的觀察者模式,同步阻塞的。如果想異步執(zhí)行事件,可以自定義SimpleApplicationEventMulticaster,然后構(gòu)造一下executor線程池就好啦。代碼如下:
/**
* 公眾號(hào):撿田螺的小男孩
*/
@Component
public class ListenerConfig {
//把線程池賦值進(jìn)去
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(simpleAsyncTaskExecutor());
return simpleApplicationEventMulticaster;
}
@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
return new SimpleAsyncTaskExecutor();
}
}
demo跑一下,運(yùn)行結(jié)果:
注冊(cè)開始
當(dāng)前線程名稱http-nio-8080-exec-1
注冊(cè)結(jié)束
用戶注冊(cè)成功2,執(zhí)行監(jiān)聽事件666Sat Jun 18 11:44:07 GMT+08:00 2022
當(dāng)前線程名稱:SimpleAsyncTaskExecutor-20
當(dāng)前線程名稱:SimpleAsyncTaskExecutor-19
用戶注冊(cè)成功,執(zhí)行監(jiān)聽事件666Sat Jun 18 11:44:12 GMT+08:00 2022
如果手動(dòng)新建SimpleApplicationEventMulticaster,并設(shè)置taskExecutor的話,所有的監(jiān)聽響應(yīng)事件,都是異步執(zhí)行的哦。而有些有些場(chǎng)景我們希望同步執(zhí)行的,這時(shí)候這種實(shí)現(xiàn)方式就不好了。
其實(shí)spring提供了@Async注解,可以用來實(shí)現(xiàn)異步。具體怎么實(shí)現(xiàn)呢?其實(shí)很簡(jiǎn)單,只需要在配置類加上@EnableAsync,接著在需要異步執(zhí)行的監(jiān)聽實(shí)現(xiàn)方法。加上@Async即可。代碼實(shí)現(xiàn)如下:
/**
* 關(guān)注公眾號(hào):撿田螺的小男孩
* 更多實(shí)戰(zhàn)干貨
*/
@Component
@EnableAsync //配置類加上```@EnableAsync```
public class ListenerConfig2 {
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
return simpleApplicationEventMulticaster;
}
}
@Component
public class MessageAsyncListener3 implements ApplicationListener{
@Async //方法異步注解
@Override
public void onApplicationEvent(MessageEvent messageEvent) {
System.out.println("用戶注冊(cè)成功3,執(zhí)行監(jiān)聽事件" + messageEvent.getSource() + new Date());
System.out.println("當(dāng)前線程名稱:"+Thread.currentThread().getName());
}
}
日常開發(fā)中,異步執(zhí)行也可以自己手動(dòng)通過線程池來開啟啦?;氐轿覀儽疚牡暮蠖怂季S主題,如果每個(gè)開發(fā),都自己定義觀察者模式的實(shí)現(xiàn),這種代碼會(huì)很混亂,所以最好是實(shí)現(xiàn)一個(gè)可擴(kuò)展,通用的觀察者模板。
6. 基于spring觀察者模式,抽取一個(gè)模板
接下來的最后小節(jié),跟大家一起基于spring的觀察者模式,一步一步實(shí)現(xiàn)并抽取個(gè)模板哈。
我們要基于spring實(shí)現(xiàn)觀察者模式的話,就包括這三步:
定義Event事件(相當(dāng)于消息),一般定義一個(gè)Event對(duì)象,繼承ApplicationEvent。
定義Listener監(jiān)聽者(相當(dāng)于觀察者),實(shí)現(xiàn)接口ApplicationListener。
Publisher發(fā)送者(相當(dāng)于被觀察者),通過ApplicationEventPublisher發(fā)布。
6.1 定義Event事件對(duì)象
既然我們要抽取觀察者模板,那肯定不是每個(gè)人自己寫自己的Event,然后都去繼承ApplicationEvent。
我們可以自己定義一個(gè)項(xiàng)目相關(guān)的,通用的BaseEvent類,然后一些相關(guān)通用的信息屬性可以放進(jìn)去,比如eventId或者流水號(hào)bizSeq什么的,都可以,看你們項(xiàng)目需要哈。以下代碼,我定義一個(gè)空空如也的BaseEvent。
/**
* 關(guān)注公眾號(hào):撿田螺的小男孩
* 更多實(shí)戰(zhàn)干貨
* @desc : 事件基礎(chǔ)對(duì)象
*/
public class BaseEvent extends ApplicationEvent {
public BaseEvent(Object source) {
super(source);
}
public BaseEvent() {
this("");
}
}
如果你的觀察者模式,是注冊(cè)成功之后,發(fā)個(gè)消息的,你就可以聲明一個(gè)消息類事件對(duì)象RegisterMessageEvent,繼承通用的BaseEvent即可。然后屬性可以自定義就好,比如messageId。
public class RegisterMessageEvent extends BaseEvent{
private String msgId;
public RegisterMessageEvent(String msgId) {
super();
this.msgId = msgId;
}
public String getMsgId() {
return msgId;
網(wǎng)站名稱:后端思維篇:如何抽取一個(gè)觀察者模板
當(dāng)前地址:http://m.5511xx.com/article/cdcegeh.html


咨詢
建站咨詢
