成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

ZStack源碼剖析之模塊鑒賞——LongJob

cheukyin / 742人閱讀

摘要:因?yàn)檫@個(gè)狀態(tài)下,是交給一個(gè)線程在執(zhí)行的,見源碼剖析之核心庫鑒賞中的分析。并且允許等行為。上面提到過,允許運(yùn)行暫停取消等行為。維護(hù)和相應(yīng)的之間的關(guān)系。則停止執(zhí)行并觸發(fā)之前的所有。

本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog...
前言

在ZStack中,當(dāng)用戶在UI上發(fā)起操作時(shí),前端會(huì)調(diào)用后端的API對(duì)實(shí)際的資源發(fā)起操作請(qǐng)求。但在一個(gè)分布式系統(tǒng)中,我們不能假設(shè)網(wǎng)絡(luò)是可靠的(同樣要面對(duì)的還有單點(diǎn)故障等)——這往往導(dǎo)致API會(huì)超時(shí)。ZStack有默認(rèn)的API超時(shí)機(jī)制,為30mins。但從UI上看來,用戶的體驗(yàn)不是很好,如下:

如果API遇到什么情況而一直沒有響應(yīng),在這里用戶也只能默默等到其超時(shí)。因?yàn)檫@個(gè)狀態(tài)下,API是交給一個(gè)線程在執(zhí)行的,見ZStack源碼剖析之核心庫鑒賞——ThreadFacade中的分析
。最可怕的是,由于隊(duì)列的存在,對(duì)該資源操作的API將全部處于隊(duì)列中而成為等待狀態(tài)。

解決方案

在ZStack 2.3版本開始引入了一個(gè)新的概念——LongJob。這基于ZStack的原有設(shè)計(jì)——FlowChain(我在我的博客中詳細(xì)分析過FlowChain,如果不懂的小伙伴可以點(diǎn)這里),依靠FlowChain,我們把業(yè)務(wù)邏輯拆成一個(gè)個(gè)個(gè)Flow,并設(shè)置對(duì)應(yīng)的RollBack。為了避免之后講起來有點(diǎn)迷,先解釋一下技術(shù)名詞。

LongJob的狀態(tài)是用于被APIQuery的,也提供了進(jìn)度條。并且允許start、stop、cancel等行為。

名詞 LongJob

長任務(wù)。以API可操作的概念具現(xiàn)。上面提到過,允許運(yùn)行、暫停、取消等行為。

LongJobInstance

長任務(wù)實(shí)例。每個(gè)作業(yè)執(zhí)行時(shí),都會(huì)生成一個(gè)實(shí)例,實(shí)例會(huì)存放在LongJobVO這個(gè)數(shù)據(jù)庫表中。便于UI調(diào)用API查看各個(gè)LongJobInstance的狀態(tài)。

Flow

最小的一個(gè)業(yè)務(wù)單元。LongJob的組成,前面說過,LongJob基于FlowChain。

LongJob Parameters

LongJob參數(shù)。用于提交LongJob的參數(shù),不同的參數(shù)可以區(qū)分不同的Job。

數(shù)據(jù)結(jié)構(gòu) LongJobVO
@Entity
@Table
public class LongJobVO extends ResourceVO {
    @Column
    private String name;

    @Column
    private String description;

    @Column
    private String apiId;

    @Column
    private String jobName;

    @Column
    private String jobData;

    @Column
    private String jobResult;

    @Column
    @Enumerated(EnumType.STRING)
    private LongJobState state;

    @Column
    private String targetResourceUuid;

    @Column
    @ForeignKey(parentEntityClass = ManagementNodeVO.class, onDeleteAction = ForeignKey.ReferenceOption.SET_NULL)
    private String managementNodeUuid;

    @Column
    private Timestamp createDate;

    @Column
    private Timestamp lastOpDate;
//忽略get set方法
}

該數(shù)據(jù)結(jié)構(gòu)描述了如下關(guān)鍵信息:

targeResourceUuid - 用以描述 job 針對(duì)的資源,對(duì)于分類查找比較有用。通過 resourceUuid 可以在 ResourceVO 里找到類型。

apiId - 用以查詢?cè)?job 在 TaskProgressVO 中的進(jìn)度信息。

jobName - 執(zhí)行該 job 的 class 名字。參見下面的 JobExecution (類似現(xiàn)有的 AbstractSchedulerJob)

jobData - 存放執(zhí)行該 job 需要的額外參數(shù)信息。

LongJob
public interface LongJob {
    void start(LongJobVO job, Completion completion);
    void cancel(LongJobVO job, Completion completion);
}

所有LongJob都必須實(shí)現(xiàn)該接口,并實(shí)現(xiàn)start/cancel等方法。

LongJobFor
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface LongJobFor {
    Class value();
}

為具體的LongJob增加該注解,表示該LongJob針對(duì)哪個(gè)APIMessage。

比如為BackupStorageMigrateImageJob增加注解:@LongJobFor(APIBackupStorageMigrateImageMsg.class)

LongJobData
interface LongJobData {
}

由于LongJob要復(fù)用現(xiàn)有邏輯以及保證可維護(hù)性,這里處理的代碼和原先邏輯為同一處。handleApiMessage和handleLongJobMessage必須要將所有的參數(shù)抽出來再傳到共用的邏輯層。不僅如此,之后定時(shí)任務(wù)也有可能做成LongJob,故此定義這個(gè)接口。

LongJobMessageData
public class LongJobMessageData implements LongJobData {
    protected final NeedReplyMessage needReplyMessage;
 
    public LongJobMessageData(NeedReplyMessage msg){
        this.needReplyMessage = msg;
    }
 
    public NeedReplyMessage getNeedReplyMessage() {
        return needReplyMessage;
    }
}

該接口實(shí)現(xiàn)了LongJobData(這里L(fēng)ongJobData僅僅用于標(biāo)識(shí)一個(gè)類型),用于完成目前的需求——部分LongJob Feature來自于APIMessage的改進(jìn)。而InnerMessage和APIMessage都繼承于NeedReplyMessage,為加強(qiáng)代碼可讀性,將公用數(shù)據(jù)結(jié)構(gòu)抽取了出來,方便調(diào)用。

LongJobFactory

根據(jù)jobName獲取LongJob實(shí)例。

比如當(dāng)jobName為APIBackupStorageMigrateImageMsg時(shí),獲取BackupStorageMigrateImageJob實(shí)例。

LongJobManager

用以處理 Job 相關(guān)的 API,比如 APICancelJobMsg,APIRestartJobMsg 等等。維護(hù) jobUuid 和相應(yīng)的 CancellableSharedFlowChain 之間的關(guān)系。

CancellableShareFlowChain

繼承 ShareFlowChain,實(shí)現(xiàn) Cancellable。每個(gè) Job 底層邏輯都必須用 CancellableSharedFlowChain 實(shí)現(xiàn)。

詳解 LongJob相關(guān)的API


在圖中我們可以看到LongJob提供了幾個(gè)API,較為重要的是QueryAPI——用戶可以使用它來查詢LongJob的一個(gè)進(jìn)度狀態(tài)。

從白話講起

LongJob則是基于FlowChain之上擴(kuò)展的,首先,每個(gè)LongJob調(diào)用與原有APIMessage行為相同的內(nèi)部Message。我們以APIAddImageMsg為例,看一下它的邏輯。

在這里,我們可以看到Msg們將其的參數(shù)都Copy到了相應(yīng)的LongJobData中,并進(jìn)行傳參,進(jìn)入了一個(gè)統(tǒng)一的入口。這樣便于邏輯的維護(hù),免于和原有的API handle處分為兩段邏輯。

再看調(diào)用實(shí)例

那么是如何調(diào)用的呢?按照老規(guī)矩,我們來看一個(gè)TestCase——AddImageLongJobCase

    void testAddImage() {
        int oldSize = Q.New(ImageVO.class).list().size()
        int flag = 0
        myDescription = "my-test"

        env.afterSimulator(SftpBackupStorageConstant.DOWNLOAD_IMAGE_PATH) { Object response ->
            //DownloadImageMsg
            LongJobVO vo = Q.New(LongJobVO.class).eq(LongJobVO_.description, myDescription).find()
            assert vo.state == LongJobState.Running
            flag += 1
            return response
        }

        APIAddImageMsg msg = new APIAddImageMsg()
        msg.setName("TinyLinux")
        msg.setBackupStorageUuids(Collections.singletonList(bs.uuid))
        msg.setUrl("http://192.168.1.20/share/images/tinylinux.qcow2")
        msg.setFormat(ImageConstant.QCOW2_FORMAT_STRING)
        msg.setMediaType(ImageConstant.ImageMediaType.RootVolumeTemplate.toString())
        msg.setPlatform(ImagePlatform.Linux.toString())

        LongJobInventory jobInv = submitLongJob {
            sessionId = adminSession()
            jobName = "APIAddImageMsg"
            jobData = gson.toJson(msg)
            description = myDescription
        } as LongJobInventory

        assert jobInv.getJobName() == "APIAddImageMsg"
        assert jobInv.state == org.zstack.sdk.LongJobState.Running

        retryInSecs() {
            LongJobVO job = dbFindByUuid(jobInv.getUuid(), LongJobVO.class)
            assert job.state == LongJobState.Succeeded
        }

        int newSize = Q.New(ImageVO.class).count().intValue()
        assert newSize > oldSize
        assert 1 == flag
    }

可以看到本質(zhì)是將原來的APIMsg轉(zhuǎn)為字符串作為LongJob的Data傳入,調(diào)用起來很方便。

實(shí)現(xiàn)

再來看看它的實(shí)現(xiàn),當(dāng)APISubmitLongJobMsg被發(fā)送出去后,handle的地方做了什么呢?見LongJobManagerImpl

    private void handle(APISubmitLongJobMsg msg) {
        // create LongJobVO
        LongJobVO vo = new LongJobVO();
        if (msg.getResourceUuid() != null) {
            vo.setUuid(msg.getResourceUuid());
        } else {
            vo.setUuid(Platform.getUuid());
        }
        if (msg.getName() != null) {
            vo.setName(msg.getName());
        } else {
            vo.setName(msg.getJobName());
        }
        vo.setDescription(msg.getDescription());
        vo.setApiId(msg.getId());
        vo.setJobName(msg.getJobName());
        vo.setJobData(msg.getJobData());
        vo.setState(LongJobState.Waiting);
        vo.setTargetResourceUuid(msg.getTargetResourceUuid());
        vo.setManagementNodeUuid(Platform.getManagementServerId());
        vo = dbf.persistAndRefresh(vo);
        logger.info(String.format("new longjob [uuid:%s, name:%s] has been created", vo.getUuid(), vo.getName()));
        tagMgr.createTagsFromAPICreateMessage(msg, vo.getUuid(), LongJobVO.class.getSimpleName());
        acntMgr.createAccountResourceRef(msg.getSession().getAccountUuid(), vo.getUuid(), LongJobVO.class);
        msg.setJobUuid(vo.getUuid());

        // wait in line
        thdf.chainSubmit(new ChainTask(msg) {
            @Override
            public String getSyncSignature() {
                return "longjob-" + msg.getJobUuid();
            }

            @Override
            public void run(SyncTaskChain chain) {
                APISubmitLongJobEvent evt = new APISubmitLongJobEvent(msg.getId());
                LongJobVO vo = dbf.findByUuid(msg.getJobUuid(), LongJobVO.class);
                vo.setState(LongJobState.Running);
                vo = dbf.updateAndRefresh(vo);
                // launch the long job right now
                ThreadContext.put(Constants.THREAD_CONTEXT_API, vo.getApiId());
                ThreadContext.put(Constants.THREAD_CONTEXT_TASK_NAME, vo.getJobName());
                LongJob job = longJobFactory.getLongJob(vo.getJobName());
                job.start(vo, new Completion(msg) {
                    LongJobVO vo = dbf.findByUuid(msg.getJobUuid(), LongJobVO.class);

                    @Override
                    public void success() {
                        vo.setState(LongJobState.Succeeded);
                        vo.setJobResult("Succeeded");
                        dbf.update(vo);
                        logger.info(String.format("successfully run longjob [uuid:%s, name:%s]", vo.getUuid(), vo.getName()));
                    }

                    @Override
                    public void fail(ErrorCode errorCode) {
                        vo.setState(LongJobState.Failed);
                        vo.setJobResult("Failed : " + errorCode.toString());
                        dbf.update(vo);
                        logger.info(String.format("failed to run longjob [uuid:%s, name:%s]", vo.getUuid(), vo.getName()));
                    }
                });


                evt.setInventory(LongJobInventory.valueOf(vo));
                logger.info(String.format("longjob [uuid:%s, name:%s] has been started", vo.getUuid(), vo.getName()));
                bus.publish(evt);

                chain.next();
            }

            @Override
            public String getName() {
                return getSyncSignature();
            }
        });
    }

這段邏輯大致為:

創(chuàng)建一個(gè)LongJob記錄,以及相關(guān)的SystemTag和賬戶資源管理引用

提交至線程池。使用LongJobFactory獲取一個(gè)LongJob實(shí)例。并執(zhí)行LongJob對(duì)應(yīng)實(shí)現(xiàn)的start,在合適的時(shí)機(jī)進(jìn)行狀態(tài)變化。

LongJobFactory
public class LongJobFactoryImpl implements LongJobFactory, Component {
    private static final CLogger logger = Utils.getLogger(LongJobFactoryImpl.class);
    /**
     * Key:LongJobName
     */
    private TreeMap allLongJob = new TreeMap<>();

    @Override
    public LongJob getLongJob(String jobName) {
        LongJob job = allLongJob.get(jobName);
        if (null == job) {
            throw new OperationFailureException(operr("%s has no corresponding longjob", jobName));
        }
        return job;
    }

    @Override
    public boolean start() {
        LongJob job = null;
        List longJobClasses = BeanUtils.scanClass("org.zstack", LongJobFor.class);
        for (Class it : longJobClasses) {
            LongJobFor at = (LongJobFor) it.getAnnotation(LongJobFor.class);
            try {
                job = (LongJob) it.newInstance();
            } catch (InstantiationException | IllegalAccessException e) {
                e.printStackTrace();
            }
            if (null == job) {
                logger.warn(String.format("[LongJob] class name [%s] but get LongJob instance is null ", at.getClass().getSimpleName()));
                continue;
            }
            logger.debug(String.format("[LongJob] collect class [%s]", job.getClass().getSimpleName()));
            allLongJob.put(at.value().getSimpleName(), job);
        }
        return true;
    }

    @Override
    public boolean stop() {
        allLongJob.clear();
        return true;
    }
}

該FactoryImpl繼承了Component接口。在ZStack Start的時(shí)候會(huì)利用反射收集帶有LongJobFor這個(gè)Annotation的Class。在原先的版本中則是每一次調(diào)用的時(shí)候利用反射去尋找,會(huì)造成一個(gè)不必要的開銷。故此這里也是做了一個(gè)Cache般的改進(jìn),因?yàn)樵贏pplication起來后是不會(huì)動(dòng)態(tài)的去添加一種LongJob的。

回來,還是以AddImageLongJob為例,我們來看看start時(shí)會(huì)做什么,見AddImageLongJob

package org.zstack.image;

import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.zstack.core.Platform;
import org.zstack.core.cloudbus.CloudBus;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.db.DatabaseFacade;
import org.zstack.header.core.Completion;
import org.zstack.header.image.APIAddImageMsg;
import org.zstack.header.image.AddImageMsg;
import org.zstack.header.image.ImageConstant;
import org.zstack.header.longjob.LongJobFor;
import org.zstack.header.longjob.LongJobVO;
import org.zstack.header.message.MessageReply;
import org.zstack.longjob.LongJob;
import org.zstack.utils.gson.JSONObjectUtil;


/**
 * Created by on camile 2018/2/2.
 */
@LongJobFor(APIAddImageMsg.class)
@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE)
public class AddImageLongJob implements LongJob {
    @Autowired
    protected CloudBus bus;
    @Autowired
    protected DatabaseFacade dbf;

    @Override
    public void start(LongJobVO job, Completion completion) {
        AddImageMsg msg = JSONObjectUtil.toObject(job.getJobData(), AddImageMsg.class);
        bus.makeLocalServiceId(msg, ImageConstant.SERVICE_ID);
        bus.send(msg, new CloudBusCallBack(null) {
            @Override
            public void run(MessageReply reply) {
                if (reply.isSuccess()) {
                    completion.success();
                } else {
                    completion.fail(reply.getError());
                }
            }
        });
    }

    @Override
    public void cancel(LongJobVO job, Completion completion) {
        // TODO
        completion.fail(Platform.operr("not supported"));
    }
}

這里則是發(fā)送了一個(gè)inner msg出去,我們看一下handle處的邏輯:

    private void handle(AddImageMsg msg) {
        AddImageReply evt = new AddImageReply();
        AddImageLongJobData data = new AddImageLongJobData(msg);
        BeanUtils.copyProperties(msg, data);
        handleAddImageMsg(data, evt);
    }

可以看到這里將msg的參數(shù)全部取了出來,放入一個(gè)公共結(jié)構(gòu)里,并傳入了真正的handle處。

APIAddImageMsg也是這么做的:

    private void handle(final APIAddImageMsg msg) {
        APIAddImageEvent evt = new APIAddImageEvent(msg.getId());
        AddImageLongJobData data = new AddImageLongJobData(msg);
        BeanUtils.copyProperties(msg, data);
        handleAddImageMsg(data, evt);
    }

在前面提到過,為了更好的可維護(hù)性,這兩個(gè)Msg共用了一段邏輯。

復(fù)用Intercepter

了解ZStack的同學(xué)都知道,任何一條APIMsg發(fā)送的時(shí)候會(huì)進(jìn)入Intercepter。那么LongJob的submit其實(shí)是把APIMsg作為參數(shù)傳入了,那么如何復(fù)用之前的Intercepter呢?我們來看看LongJobApiInterceptor

public class LongJobApiInterceptor implements ApiMessageInterceptor, Component {
    private static final CLogger logger = Utils.getLogger(LongJobApiInterceptor.class);

    /**
     * Key:LongJobName
     */
    private TreeMap> apiMsgOfLongJob = new TreeMap<>();

    @Override
    public APIMessage intercept(APIMessage msg) throws ApiMessageInterceptionException {
        if (msg instanceof APISubmitLongJobMsg) {
            validate((APISubmitLongJobMsg) msg);
        } else if (msg instanceof APICancelLongJobMsg) {
            validate((APICancelLongJobMsg) msg);
        } else if (msg instanceof APIDeleteLongJobMsg) {
            validate((APIDeleteLongJobMsg) msg);
        }

        return msg;
    }

    private void validate(APISubmitLongJobMsg msg) {
        Class apiClass = apiMsgOfLongJob.get(msg.getJobName());
        if (null == apiClass) {
            throw new ApiMessageInterceptionException(argerr("%s is not an API", msg.getJobName()));
        }
        // validate msg.jobData
        Map config = new HashMap<>();
        List serviceConfigFolders = new ArrayList<>();
        serviceConfigFolders.add("serviceConfig");
        config.put("serviceConfigFolders", serviceConfigFolders);
        ApiMessageProcessor processor = new ApiMessageProcessorImpl(config);
        APIMessage jobMsg = JSONObjectUtil.toObject(msg.getJobData(), apiClass);
        jobMsg.setSession(msg.getSession());
        jobMsg = processor.process(jobMsg);                     // may throw ApiMessageInterceptionException
        msg.setJobData(JSONObjectUtil.toJsonString(jobMsg));    // msg may be changed during validation
    }

    private void validate(APICancelLongJobMsg msg) {
        LongJobState state = Q.New(LongJobVO.class)
                .select(LongJobVO_.state)
                .eq(LongJobVO_.uuid, msg.getUuid())
                .findValue();

        if (state == LongJobState.Succeeded) {
            throw new ApiMessageInterceptionException(argerr("cannot cancel longjob that is succeeded"));
        }
        if (state == LongJobState.Canceled) {
            throw new ApiMessageInterceptionException(argerr("cannot cancel longjob that is already canceled"));
        }
        if (state == LongJobState.Failed) {
            throw new ApiMessageInterceptionException(argerr("cannot cancel longjob that is failed"));
        }
    }

    private void validate(APIDeleteLongJobMsg msg) {
        LongJobState state = Q.New(LongJobVO.class)
                .select(LongJobVO_.state)
                .eq(LongJobVO_.uuid, msg.getUuid())
                .findValue();

        if (state != LongJobState.Succeeded && state != LongJobState.Canceled && state != LongJobState.Failed) {
            throw new ApiMessageInterceptionException(argerr("delete longjob only when it"s succeeded, canceled, or failed"));
        }
    }

    @Override
    public boolean start() {
        Class apiClass = null;
        List longJobClasses = BeanUtils.scanClass("org.zstack", LongJobFor.class);
        for (Class it : longJobClasses) {
            LongJobFor at = (LongJobFor) it.getAnnotation(LongJobFor.class);
            try {
                apiClass = (Class) Class.forName(at.value().getName());
            } catch (ClassNotFoundException | ClassCastException e) {
                //ApiMessage and LongJob are not one by one corresponding ,so we skip it
                e.printStackTrace();
                continue;
            }
            logger.debug(String.format("[LongJob] collect api class [%s]", apiClass.getSimpleName()));
            apiMsgOfLongJob.put(at.value().getSimpleName(), apiClass);
        }
        return true;
    }

    @Override
    public boolean stop() {
        apiMsgOfLongJob.clear();
        return true;
    }
}

邏輯很簡單,通過LongJob的name找出了對(duì)應(yīng)的APIMsg,并將APIMsg發(fā)向了對(duì)應(yīng)Intercepter。

在查找APIMsg這一步也是采用了Cache的思想,在Start的時(shí)候就進(jìn)行了收集。

展望

在前面的定義中,我們提到了LongJob是允許暫停和取消行為的。這在接口中也可以看到類似的期許:

public interface LongJob {
    void start(LongJobVO job, Completion completion);
    void cancel(LongJobVO job, Completion completion);
}

那么該如何實(shí)現(xiàn)它呢?在這里我們僅僅做一個(gè)展望,到時(shí)還是以釋放出來的代碼為準(zhǔn)。

Stop

首先,在CancellableSharedFlowChain 定義一個(gè)必須被實(shí)現(xiàn)的接口。如`stop
Condition`,返回一個(gè)boolean。在每個(gè)Flow執(zhí)行前會(huì)判斷該boolean是否為true,如果為true。則保存context到db,并停止執(zhí)行。

Cancel

同樣,也是在CancellableSharedFlowChain 定義一個(gè)必須被實(shí)現(xiàn)的接口。如cancelCondition,返回一個(gè)boolean。在每個(gè)Flow執(zhí)行前會(huì)判斷該boolean是否為true,如果為true。則停止執(zhí)行并觸發(fā)之前的所有rollback。

Rollback的特殊技巧

那么可能會(huì)有同學(xué)問了,在這樣的設(shè)計(jì)下,如果發(fā)生了如斷電的情況,必然導(dǎo)致無法Rollback。這種情況如果發(fā)生在一個(gè)數(shù)據(jù)中心,可以說是災(zāi)難也不為過。但是我們可以考慮一下如何實(shí)現(xiàn)更具有原子性Rollback。

淺談數(shù)據(jù)庫事務(wù)的實(shí)現(xiàn)

數(shù)據(jù)庫的事務(wù)主要是通過Undo日志來實(shí)現(xiàn)。在一條記錄更新前(更新到硬盤),一定要把相關(guān)的Undo日志寫入硬盤;而“提交事務(wù)”這種記錄,要在記錄更新完畢后再寫入硬盤。所謂的Undo日志,就是沒有操作前的日志。如果同學(xué)們聽完還是覺得有點(diǎn)迷,可以看這篇文章:

碼農(nóng)翻身:愛炫耀的數(shù)據(jù)庫老頭兒

可以考慮的方案

在了解了數(shù)據(jù)庫事務(wù)的實(shí)現(xiàn)后,我們可以大致設(shè)計(jì)出一種方案,用于保證斷電后Rollback的完整性:

在一個(gè)FlowChain執(zhí)行前,在DB里存入一個(gè)類似Start FlowChain的標(biāo)記

定義每一個(gè)Flow的Number號(hào),如第一個(gè)Flow為1。在Flow執(zhí)行前,記錄當(dāng)前Flow Number到數(shù)據(jù)庫,寫Flow1開始執(zhí)行。Flow執(zhí)行完之前,寫Flow1執(zhí)行完畢。

Flow執(zhí)行完了,在DB里存入一個(gè)類似Done FlowChian的標(biāo)記。這里我們把Done的那部分也看做一個(gè)Flow。

那么在任何以步驟出問題的時(shí)候,基本都可以完成一個(gè)Rollback。我們來看一看:

還沒執(zhí)行Flow的時(shí)候斷電

DB中的記錄為Start FlowChain,那么是不需要Rollback的。

執(zhí)行一個(gè)Flow的時(shí)候斷電

DB中的最新記錄為Flow1開始執(zhí)行的話,不需要Rollback。這種分布式場(chǎng)景下如果需要做到強(qiáng)一致性,只能對(duì)每行代碼做類似Undo日志的記錄了。

但是如果記錄為Flow1執(zhí)行完畢,開始Rollback。

之后執(zhí)行幾個(gè)Flow都是參考這里的一個(gè)做法。

小結(jié)

在本文中,筆者和大家了解了ZStack在2.3引入的新模塊——LongJob。并對(duì)其的出現(xiàn)的背景、解決的痛點(diǎn)和實(shí)現(xiàn)進(jìn)行了分析,最后展望了一下接下來版本中可能會(huì)增強(qiáng)的功能。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68745.html

相關(guān)文章

  • ZStack源碼剖析二次開發(fā)——可擴(kuò)展框架

    摘要:但在實(shí)際的二次開發(fā)中,這些做法未必能夠完全滿足需求。在源碼剖析之核心庫鑒賞一文中,我們了解到是的基礎(chǔ)設(shè)施之一,同時(shí)也允許通過顯示聲明的方式來聲明。同理,一些也可以使用繼承進(jìn)行擴(kuò)展。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack博文-5.通用插件系統(tǒng)中,官方提出了幾個(gè)較為經(jīng)典的擴(kuò)展方式。但在實(shí)際的二次開發(fā)中,這些做法未必...

    lolomaco 評(píng)論0 收藏0
  • ZStack源碼剖析設(shè)計(jì)模式鑒賞——三駕馬車

    摘要:但新增模塊的結(jié)構(gòu)卻還是大致相同,此即是的經(jīng)典設(shè)計(jì)模式這套模式也被開發(fā)者稱為三駕馬車。領(lǐng)域?qū)佣x負(fù)責(zé)表達(dá)業(yè)務(wù)概念,業(yè)務(wù)狀態(tài)信息以及業(yè)務(wù)規(guī)則。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 隨著ZStack的版本迭代,其可以掌管的資源也越來越多。但新增模塊的結(jié)構(gòu)卻還是大致相同,此即是ZStack的經(jīng)典設(shè)計(jì)模式——這套模式也被開發(fā)者稱為ZS...

    honhon 評(píng)論0 收藏0
  • ZStack源碼剖析核心庫鑒賞——Defer

    摘要:本文首發(fā)于泊浮目的專欄在語言中,有一個(gè)關(guān)鍵字叫做其作用是在函數(shù)前執(zhí)行。一般有兩種用法在該函數(shù)拋出異常時(shí)執(zhí)行。在該函數(shù)返回前執(zhí)行。這里的放入來自系統(tǒng)啟動(dòng)時(shí)利用反射所做的一個(gè)行為。因此并不會(huì)影響使用時(shí)的性能。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 在Go語言中,有一個(gè)關(guān)鍵字叫做defer——其作用是在函數(shù)return前執(zhí)行。在ZStac...

    DevWiki 評(píng)論0 收藏0
  • ZStack源碼剖析核心庫鑒賞——Defer

    摘要:本文首發(fā)于泊浮目的專欄在語言中,有一個(gè)關(guān)鍵字叫做其作用是在函數(shù)前執(zhí)行。一般有兩種用法在該函數(shù)拋出異常時(shí)執(zhí)行。在該函數(shù)返回前執(zhí)行。這里的放入來自系統(tǒng)啟動(dòng)時(shí)利用反射所做的一個(gè)行為。因此并不會(huì)影響使用時(shí)的性能。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 在Go語言中,有一個(gè)關(guān)鍵字叫做defer——其作用是在函數(shù)return前執(zhí)行。在ZStac...

    ymyang 評(píng)論0 收藏0
  • ZStack源碼剖析設(shè)計(jì)模式鑒賞——策略模式

    摘要:能夠整體地替換算法,能讓我們輕松地以不同的算法去解決一個(gè)問題,這種模式就是模式。這個(gè)類是在發(fā)布前常在中被使用的一個(gè)類,代碼如下以為例,從語義上來說就是為了中的每個(gè)元素調(diào)用函數(shù)。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 無論什么程序,其目的都是解決問題。而為了解決問題,我們又需要編寫特定的算法。使用Strategy模式可以整體地替...

    Eric 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<