摘要:創(chuàng)建訂單時(shí)同步操作有查詢(xún)庫(kù)存,扣款,刷新庫(kù)存可異步的操作有通知風(fēng)控系統(tǒng),給買(mǎi)家發(fā)送扣款郵件和短信,通知賣(mài)家,創(chuàng)建一些定時(shí)任務(wù)。
同步轉(zhuǎn)異步是一種常見(jiàn)的優(yōu)化手段,最近一次在做調(diào)優(yōu)時(shí)便大量使用了這種方式。通常在一個(gè)業(yè)務(wù)場(chǎng)景中會(huì)包含多個(gè)操作,有些操作的結(jié)果需要讓用戶(hù)立馬知道,但有些操作則不需要。這些用戶(hù)不需要等待結(jié)果的操作,我們?cè)诰幊痰臅r(shí)候便可以異步處理。這么做最直接的效果就是縮短接口響應(yīng)速度,提升用戶(hù)體驗(yàn)。
我此次優(yōu)化的是下單場(chǎng)景。創(chuàng)建訂單時(shí)同步操作有: 查詢(xún)庫(kù)存,扣款,刷新庫(kù)存; 可異步的操作有: 通知風(fēng)控系統(tǒng),給買(mǎi)家發(fā)送扣款郵件和短信,通知賣(mài)家,創(chuàng)建一些定時(shí)任務(wù)。
最初我用的方案是Spring提供的@Async機(jī)制。這是一種很輕量的做法,只需要在可異步調(diào)用的方法上加上@Async注解即可。但是這種做法也存在兩個(gè)問(wèn)題: 1. 不支持類(lèi)內(nèi)部方法之間的調(diào)用。使用這種方式,我必須要把一些需要異步調(diào)用的方法轉(zhuǎn)移到一個(gè)新類(lèi)里,這點(diǎn)讓人不爽。2. 當(dāng)系統(tǒng)crash的時(shí)候,緩存的任務(wù)就丟了。因此,這個(gè)方案并不特別理想。
兩年之前用akka做過(guò)一個(gè)社交應(yīng)用的后端服務(wù),而且消息模型天生異步,所以自然想到了用akka。但是用akka的話(huà)也有一些地方需要注意。第一,Actor是單線(xiàn)程順序執(zhí)行,如果任務(wù)比較多最好使用actor router。actor router管理多個(gè)actor,可以做到一定限度的并行執(zhí)行。第二,使用有持久化actor,確保任務(wù)不會(huì)丟失。我會(huì)以發(fā)push提醒為例描述一下這個(gè)方案的實(shí)現(xiàn)細(xì)節(jié)。多數(shù)場(chǎng)景中發(fā)push提醒都可進(jìn)行異步調(diào)用。
下單邏輯都放在OrderService中,下單成功給賣(mài)家發(fā)送push提醒時(shí),Orderservice會(huì)給NotificationActor發(fā)送一個(gè)消息。
NotificationActor有兩個(gè)職責(zé):1. 保存接收到的任務(wù);2. 把消息轉(zhuǎn)發(fā)給NotificationWorker,當(dāng)Worker執(zhí)行成功之后把消息刪除。在最新版本的akka中可以使用At-Least-Once Delivery實(shí)現(xiàn)這兩個(gè)功能。
NotificationWorkerRouter僅僅處理發(fā)送邏輯。WorkerActor以Router方式進(jìn)行部署,以實(shí)現(xiàn)并行處理,提高處理效率。
下邊看一下具體實(shí)現(xiàn)細(xì)節(jié):
public class NotificationActor extends UntypedPersistentActorWithAtLeastOnceDelivery { private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); private ActorRef notificationWorkers = null; private final String uniqueId = UUID.randomUUID().toString(); @Autowired public NotificationActor(final ActorSystemManager actorSystemManager) { this.notificationWorkers = actorSystemManager.notificationWorkers; } @Override public String persistenceId() { return "journal:notification-actor:" + uniqueId; } @Override public void onReceiveRecover(final Object msg) throws Throwable { if (msg instanceof NotificationMessage) { deliverAckMessage((NotificationMessage) msg); } } @Override public void onReceiveCommand(final Object msg) throws Throwable { if (msg instanceof NotificationMessage) { persist(msg, m -> { deliverAckMessage((NotificationMessage) m); }); } else if (msg instanceof Confirm) { Confirm confirm = (Confirm) msg; confirmMessage(new MsgConfirmed(confirm.deliveryId)); } else if (msg instanceof UnconfirmedWarning) { UnconfirmedWarning warning = (UnconfirmedWarning) msg; warning.getUnconfirmedDeliveries().forEach(d -> { log.error("[NOTIFICATION-ACTOR] Unconfirmed Messages: {}", d.message()); confirmMessage(new MsgConfirmed(d.deliveryId())); }); } else { unhandled(msg); } } private void deliverAckMessage(NotificationMessage event) { deliver(notificationWorkers.path(), (Function) deliveryId -> new AckMessage(deliveryId, event)); } private void confirmMessage(final MsgConfirmed evt) { confirmDelivery(evt.deliveryId); deleteMessages(evt.deliveryId); } public interface NotificationMessage extends Event {} public static final @Data class PushMessage implements NotificationMessage { private final Long source; private final Long target; private final String trigger; private final ImmutableMap payload; } } public class NotificationWorkerActor extends UntypedActor { private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); private final @NonNull NotificationService notificationService; @Autowired public NotificationWorkerActor(final NotificationService notificationService) { this.notificationService = notificationService; } @Override public void onReceive(final Object event) throws Throwable { if (event instanceof AckMessage) { final AckMessage ackMessage = (AckMessage) event; NotificationMessage msg = (NotificationMessage) ackMessage.msg; log.info("[NOTIFICATION] receive message: {}", msg); if (msg instanceof PushMessage) { final PushMessage m = (PushMessage) msg; log.info("[NOTIFICATION] send push notification from: {} to: {}", m.getSource(), m.getTarget()); notificationService.notify(m.getSource(), m.getTarget(), m.getTrigger(), m.getPayload()); } sender().tell(new Confirm(ackMessage.deliveryId), self()); } else { unhandled(event); } } } public class OrderService { public void createOrder() { actorSystemManager.notificationActor.tell( new PushMessage(), ActorRef.noSender() ); } }
最早實(shí)施這個(gè)方案的時(shí)候遇到一個(gè)問(wèn)題,說(shuō)一下這個(gè)問(wèn)題如何產(chǎn)生的。我們一共有三臺(tái)服務(wù)器,三臺(tái)服務(wù)器都會(huì)部署同樣的代碼,以NotificationActor為例,它會(huì)分別部署在三個(gè)機(jī)器上。actor journal我們使用mysql存儲(chǔ)。akka persistent actor內(nèi)部有一個(gè)sequence number用來(lái)對(duì)接收到的消息進(jìn)行計(jì)數(shù),這個(gè)數(shù)字是遞增的。同時(shí)這個(gè)數(shù)字也會(huì)在journal中記錄。最初我的persistenceId方法是這樣實(shí)現(xiàn)的:
@Override public String persistenceId() { return "journal:notification-actor"; }
那么,假如server1上的NotificationActor接收了一個(gè)消息,那么它的sequence number會(huì)變成1,mysql中將會(huì)存儲(chǔ)的sequence number為1的消息。這時(shí)server2上也接收到了一個(gè)消息,因?yàn)樗淖畛鮯equence number是0,所以它也會(huì)把現(xiàn)在接收到的消息的sequence number設(shè)置為1。但是顯然這條消息是不能持久化的,因?yàn)樗蛿?shù)據(jù)庫(kù)記錄的sequence number沖突了。根本原因是三臺(tái)服務(wù)器上的NotificationActor的persistenceId是一樣的。
上邊代碼中給出了一種方案,把persistenceId變成random的,每次actor啟動(dòng)的時(shí)候都會(huì)得到不同的persistenceId,這樣就解決了上述問(wèn)題。還有一種方案是引入akka cluster,使用akka singleton。這種方案會(huì)在下一篇文章中詳細(xì)說(shuō)明。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/66470.html
摘要:接下來(lái)會(huì)選擇一個(gè)最老的實(shí)例并在上面創(chuàng)建單例。可以確保整個(gè)集群中至多有一個(gè)的實(shí)例,言下之意,存在沒(méi)有實(shí)例的時(shí)刻。訪(fǎng)問(wèn)需要借助于,會(huì)把所有的消息給當(dāng)前被代理的實(shí)例。 上篇文章主要講了如何使用Akka作異步任務(wù)處理。最后還拋出一個(gè)問(wèn)題。 具體問(wèn)題的描述就不在這篇文章贅述了,我們僅簡(jiǎn)單回顧一下第一種解決方案:覆寫(xiě)persistenceId()時(shí),加一個(gè)UUID,這樣三臺(tái)服務(wù)器上的Actor就不...
摘要:接下來(lái)會(huì)選擇一個(gè)最老的實(shí)例并在上面創(chuàng)建單例??梢源_保整個(gè)集群中至多有一個(gè)的實(shí)例,言下之意,存在沒(méi)有實(shí)例的時(shí)刻。訪(fǎng)問(wèn)需要借助于,會(huì)把所有的消息給當(dāng)前被代理的實(shí)例。 上篇文章主要講了如何使用Akka作異步任務(wù)處理。最后還拋出一個(gè)問(wèn)題。 具體問(wèn)題的描述就不在這篇文章贅述了,我們僅簡(jiǎn)單回顧一下第一種解決方案:覆寫(xiě)persistenceId()時(shí),加一個(gè)UUID,這樣三臺(tái)服務(wù)器上的Actor就不...
摘要:本文介紹和點(diǎn)評(píng)上的等并發(fā)編程模型。異步更適合并發(fā)編程。同步使線(xiàn)程阻塞,導(dǎo)致等待。基本模型這是最簡(jiǎn)單的模型,創(chuàng)建線(xiàn)程來(lái)執(zhí)行一個(gè)任務(wù),完畢后銷(xiāo)毀線(xiàn)程。響應(yīng)式編程是一種面向數(shù)據(jù)流和變化傳播的編程模式。起源于電信領(lǐng)域的的編程模型。 本文介紹和點(diǎn)評(píng)JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并發(fā)編程模型。本人經(jīng)驗(yàn)...
摘要:本文介紹和點(diǎn)評(píng)上的等并發(fā)編程模型。異步更適合并發(fā)編程。同步使線(xiàn)程阻塞,導(dǎo)致等待?;灸P瓦@是最簡(jiǎn)單的模型,創(chuàng)建線(xiàn)程來(lái)執(zhí)行一個(gè)任務(wù),完畢后銷(xiāo)毀線(xiàn)程。響應(yīng)式編程是一種面向數(shù)據(jù)流和變化傳播的編程模式。起源于電信領(lǐng)域的的編程模型。 本文介紹和點(diǎn)評(píng)JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并發(fā)編程模型。本人經(jīng)驗(yàn)...
摘要:關(guān)于三者的一些概括總結(jié)離線(xiàn)分析框架,適合離線(xiàn)的復(fù)雜的大數(shù)據(jù)處理內(nèi)存計(jì)算框架,適合在線(xiàn)離線(xiàn)快速的大數(shù)據(jù)處理流式計(jì)算框架,適合在線(xiàn)的實(shí)時(shí)的大數(shù)據(jù)處理我是一個(gè)以架構(gòu)師為年之內(nèi)目標(biāo)的小小白。 整理自《架構(gòu)解密從分布式到微服務(wù)》第七章——聊聊分布式計(jì)算.做了相應(yīng)補(bǔ)充和修改。 [TOC] 前言 不管是網(wǎng)絡(luò)、內(nèi)存、還是存儲(chǔ)的分布式,它們最終目的都是為了實(shí)現(xiàn)計(jì)算的分布式:數(shù)據(jù)在各個(gè)計(jì)算機(jī)節(jié)點(diǎn)上流動(dòng),同...
閱讀 2063·2021-10-08 10:04
閱讀 3091·2021-09-22 10:02
閱讀 2245·2019-08-30 15:56
閱讀 834·2019-08-30 15:54
閱讀 931·2019-08-30 15:54
閱讀 1288·2019-08-30 15:53
閱讀 2516·2019-08-30 11:21
閱讀 3564·2019-08-30 10:56