成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

使用akka作異步任務(wù)處理

shiweifu / 1696人閱讀

摘要:創(chuàng)建訂單時(shí)同步操作有查詢(xún)庫(kù)存,扣款,刷新庫(kù)存可異步的操作有通知風(fēng)控系統(tǒng),給買(mǎi)家發(fā)送扣款郵件和短信,通知賣(mài)家,創(chuàng)建一些定時(shí)任務(wù)。

同步轉(zhuǎn)異步是一種常見(jiàn)的優(yōu)化手段,最近一次在做調(diào)優(yōu)時(shí)便大量使用了這種方式。通常在一個(gè)業(yè)務(wù)場(chǎng)景中會(huì)包含多個(gè)操作,有些操作的結(jié)果需要讓用戶(hù)立馬知道,但有些操作則不需要。這些用戶(hù)不需要等待結(jié)果的操作,我們?cè)诰幊痰臅r(shí)候便可以異步處理。這么做最直接的效果就是縮短接口響應(yīng)速度,提升用戶(hù)體驗(yàn)。

我此次優(yōu)化的是下單場(chǎng)景。創(chuàng)建訂單時(shí)同步操作有: 查詢(xún)庫(kù)存,扣款,刷新庫(kù)存; 可異步的操作有: 通知風(fēng)控系統(tǒng),給買(mǎi)家發(fā)送扣款郵件和短信,通知賣(mài)家,創(chuàng)建一些定時(shí)任務(wù)。

最初我用的方案是Spring提供的@Async機(jī)制。這是一種很輕量的做法,只需要在可異步調(diào)用的方法上加上@Async注解即可。但是這種做法也存在兩個(gè)問(wèn)題: 1. 不支持類(lèi)內(nèi)部方法之間的調(diào)用。使用這種方式,我必須要把一些需要異步調(diào)用的方法轉(zhuǎn)移到一個(gè)新類(lèi)里,這點(diǎn)讓人不爽。2. 當(dāng)系統(tǒng)crash的時(shí)候,緩存的任務(wù)就丟了。因此,這個(gè)方案并不特別理想。

兩年之前用akka做過(guò)一個(gè)社交應(yīng)用的后端服務(wù),而且消息模型天生異步,所以自然想到了用akka。但是用akka的話(huà)也有一些地方需要注意。第一,Actor是單線(xiàn)程順序執(zhí)行,如果任務(wù)比較多最好使用actor router。actor router管理多個(gè)actor,可以做到一定限度的并行執(zhí)行。第二,使用有持久化actor,確保任務(wù)不會(huì)丟失。我會(huì)以發(fā)push提醒為例描述一下這個(gè)方案的實(shí)現(xiàn)細(xì)節(jié)。多數(shù)場(chǎng)景中發(fā)push提醒都可進(jìn)行異步調(diào)用。

下單邏輯都放在OrderService中,下單成功給賣(mài)家發(fā)送push提醒時(shí),Orderservice會(huì)給NotificationActor發(fā)送一個(gè)消息。

NotificationActor有兩個(gè)職責(zé):1. 保存接收到的任務(wù);2. 把消息轉(zhuǎn)發(fā)給NotificationWorker,當(dāng)Worker執(zhí)行成功之后把消息刪除。在最新版本的akka中可以使用At-Least-Once Delivery實(shí)現(xiàn)這兩個(gè)功能。

NotificationWorkerRouter僅僅處理發(fā)送邏輯。WorkerActor以Router方式進(jìn)行部署,以實(shí)現(xiàn)并行處理,提高處理效率。

下邊看一下具體實(shí)現(xiàn)細(xì)節(jié):

public class NotificationActor extends UntypedPersistentActorWithAtLeastOnceDelivery {
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private ActorRef notificationWorkers = null;
    private final String uniqueId = UUID.randomUUID().toString();

    @Autowired
    public NotificationActor(final ActorSystemManager actorSystemManager) {
        this.notificationWorkers = actorSystemManager.notificationWorkers;
    }

    @Override public String persistenceId() {
        return "journal:notification-actor:" + uniqueId;
    }

    @Override public void onReceiveRecover(final Object msg) throws Throwable {
        if (msg instanceof NotificationMessage) {
            deliverAckMessage((NotificationMessage) msg);
        }
    }

    @Override public void onReceiveCommand(final Object msg) throws Throwable {
        if (msg instanceof NotificationMessage) {
            persist(msg, m -> { deliverAckMessage((NotificationMessage) m); });
        } else if (msg instanceof Confirm) {
            Confirm confirm = (Confirm) msg;
            confirmMessage(new MsgConfirmed(confirm.deliveryId));
        } else if (msg instanceof UnconfirmedWarning) {
            UnconfirmedWarning warning = (UnconfirmedWarning) msg;
            warning.getUnconfirmedDeliveries().forEach(d -> {
                log.error("[NOTIFICATION-ACTOR] Unconfirmed Messages: {}", d.message());

                confirmMessage(new MsgConfirmed(d.deliveryId()));
            });
        } else {
            unhandled(msg);
        }
    }

    private void deliverAckMessage(NotificationMessage event) {
        deliver(notificationWorkers.path(), (Function) deliveryId -> new AckMessage(deliveryId, event));
    }

    private void confirmMessage(final MsgConfirmed evt) {
        confirmDelivery(evt.deliveryId);
        deleteMessages(evt.deliveryId);
    }

    public interface NotificationMessage extends Event {}

    public static final @Data class PushMessage implements NotificationMessage {
        private final Long source;
        private final Long target;
        private final String trigger;
        private final ImmutableMap payload;
    }
}

public class NotificationWorkerActor extends UntypedActor {
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private final @NonNull NotificationService notificationService;

    @Autowired
    public NotificationWorkerActor(final NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    @Override public void onReceive(final Object event) throws Throwable {
        if (event instanceof AckMessage) {
            final AckMessage ackMessage = (AckMessage) event;
            NotificationMessage msg = (NotificationMessage) ackMessage.msg;
            log.info("[NOTIFICATION] receive message: {}", msg);

            if (msg instanceof PushMessage) {
                final PushMessage m = (PushMessage) msg;
                log.info("[NOTIFICATION] send push notification from: {} to: {}", m.getSource(), m.getTarget());
                notificationService.notify(m.getSource(), m.getTarget(), m.getTrigger(), m.getPayload());
            }
            sender().tell(new Confirm(ackMessage.deliveryId), self());
        } else {
            unhandled(event);
        }
    }
}

public class OrderService {
    public void createOrder() {
        actorSystemManager.notificationActor.tell(
          new PushMessage(), ActorRef.noSender()
        );
    }
}

最早實(shí)施這個(gè)方案的時(shí)候遇到一個(gè)問(wèn)題,說(shuō)一下這個(gè)問(wèn)題如何產(chǎn)生的。我們一共有三臺(tái)服務(wù)器,三臺(tái)服務(wù)器都會(huì)部署同樣的代碼,以NotificationActor為例,它會(huì)分別部署在三個(gè)機(jī)器上。actor journal我們使用mysql存儲(chǔ)。akka persistent actor內(nèi)部有一個(gè)sequence number用來(lái)對(duì)接收到的消息進(jìn)行計(jì)數(shù),這個(gè)數(shù)字是遞增的。同時(shí)這個(gè)數(shù)字也會(huì)在journal中記錄。最初我的persistenceId方法是這樣實(shí)現(xiàn)的:

@Override public String persistenceId() {
    return "journal:notification-actor";
}

那么,假如server1上的NotificationActor接收了一個(gè)消息,那么它的sequence number會(huì)變成1,mysql中將會(huì)存儲(chǔ)的sequence number為1的消息。這時(shí)server2上也接收到了一個(gè)消息,因?yàn)樗淖畛鮯equence number是0,所以它也會(huì)把現(xiàn)在接收到的消息的sequence number設(shè)置為1。但是顯然這條消息是不能持久化的,因?yàn)樗蛿?shù)據(jù)庫(kù)記錄的sequence number沖突了。根本原因是三臺(tái)服務(wù)器上的NotificationActor的persistenceId是一樣的。

上邊代碼中給出了一種方案,把persistenceId變成random的,每次actor啟動(dòng)的時(shí)候都會(huì)得到不同的persistenceId,這樣就解決了上述問(wèn)題。還有一種方案是引入akka cluster,使用akka singleton。這種方案會(huì)在下一篇文章中詳細(xì)說(shuō)明。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/66470.html

相關(guān)文章

  • 使用Akka Cluster Singleton實(shí)現(xiàn)集群?jiǎn)卫?/b>

    摘要:接下來(lái)會(huì)選擇一個(gè)最老的實(shí)例并在上面創(chuàng)建單例。可以確保整個(gè)集群中至多有一個(gè)的實(shí)例,言下之意,存在沒(méi)有實(shí)例的時(shí)刻。訪(fǎng)問(wèn)需要借助于,會(huì)把所有的消息給當(dāng)前被代理的實(shí)例。 上篇文章主要講了如何使用Akka作異步任務(wù)處理。最后還拋出一個(gè)問(wèn)題。 具體問(wèn)題的描述就不在這篇文章贅述了,我們僅簡(jiǎn)單回顧一下第一種解決方案:覆寫(xiě)persistenceId()時(shí),加一個(gè)UUID,這樣三臺(tái)服務(wù)器上的Actor就不...

    xiangzhihong 評(píng)論0 收藏0
  • 使用Akka Cluster Singleton實(shí)現(xiàn)集群?jiǎn)卫?/b>

    摘要:接下來(lái)會(huì)選擇一個(gè)最老的實(shí)例并在上面創(chuàng)建單例??梢源_保整個(gè)集群中至多有一個(gè)的實(shí)例,言下之意,存在沒(méi)有實(shí)例的時(shí)刻。訪(fǎng)問(wèn)需要借助于,會(huì)把所有的消息給當(dāng)前被代理的實(shí)例。 上篇文章主要講了如何使用Akka作異步任務(wù)處理。最后還拋出一個(gè)問(wèn)題。 具體問(wèn)題的描述就不在這篇文章贅述了,我們僅簡(jiǎn)單回顧一下第一種解決方案:覆寫(xiě)persistenceId()時(shí),加一個(gè)UUID,這樣三臺(tái)服務(wù)器上的Actor就不...

    TZLLOG 評(píng)論0 收藏0
  • JVM并發(fā)編程模型覽

    摘要:本文介紹和點(diǎn)評(píng)上的等并發(fā)編程模型。異步更適合并發(fā)編程。同步使線(xiàn)程阻塞,導(dǎo)致等待。基本模型這是最簡(jiǎn)單的模型,創(chuàng)建線(xiàn)程來(lái)執(zhí)行一個(gè)任務(wù),完畢后銷(xiāo)毀線(xiàn)程。響應(yīng)式編程是一種面向數(shù)據(jù)流和變化傳播的編程模式。起源于電信領(lǐng)域的的編程模型。 本文介紹和點(diǎn)評(píng)JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并發(fā)編程模型。本人經(jīng)驗(yàn)...

    cppowboy 評(píng)論0 收藏0
  • JVM并發(fā)編程模型覽

    摘要:本文介紹和點(diǎn)評(píng)上的等并發(fā)編程模型。異步更適合并發(fā)編程。同步使線(xiàn)程阻塞,導(dǎo)致等待?;灸P瓦@是最簡(jiǎn)單的模型,創(chuàng)建線(xiàn)程來(lái)執(zhí)行一個(gè)任務(wù),完畢后銷(xiāo)毀線(xiàn)程。響應(yīng)式編程是一種面向數(shù)據(jù)流和變化傳播的編程模式。起源于電信領(lǐng)域的的編程模型。 本文介紹和點(diǎn)評(píng)JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并發(fā)編程模型。本人經(jīng)驗(yàn)...

    wudengzan 評(píng)論0 收藏0
  • 關(guān)于分布式計(jì)算的一些概念

    摘要:關(guān)于三者的一些概括總結(jié)離線(xiàn)分析框架,適合離線(xiàn)的復(fù)雜的大數(shù)據(jù)處理內(nèi)存計(jì)算框架,適合在線(xiàn)離線(xiàn)快速的大數(shù)據(jù)處理流式計(jì)算框架,適合在線(xiàn)的實(shí)時(shí)的大數(shù)據(jù)處理我是一個(gè)以架構(gòu)師為年之內(nèi)目標(biāo)的小小白。 整理自《架構(gòu)解密從分布式到微服務(wù)》第七章——聊聊分布式計(jì)算.做了相應(yīng)補(bǔ)充和修改。 [TOC] 前言 不管是網(wǎng)絡(luò)、內(nèi)存、還是存儲(chǔ)的分布式,它們最終目的都是為了實(shí)現(xiàn)計(jì)算的分布式:數(shù)據(jù)在各個(gè)計(jì)算機(jī)節(jié)點(diǎn)上流動(dòng),同...

    Ververica 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<