成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

Vert.x Blueprint 系列教程(二) | 開(kāi)發(fā)基于消息的應(yīng)用 - Vert.x Kue

elina / 3102人閱讀

摘要:本文章是藍(lán)圖系列的第二篇教程。這就是請(qǐng)求回應(yīng)模式。好多屬性我們一個(gè)一個(gè)地解釋一個(gè)序列,作為的地址任務(wù)的編號(hào)任務(wù)的類(lèi)型任務(wù)攜帶的數(shù)據(jù),以類(lèi)型表示任務(wù)優(yōu)先級(jí),以枚舉類(lèi)型表示。默認(rèn)優(yōu)先級(jí)為正常任務(wù)的延遲時(shí)間,默認(rèn)是任務(wù)狀態(tài),以枚舉類(lèi)型表示。

本文章是 Vert.x 藍(lán)圖系列 的第二篇教程。全系列:

Vert.x Blueprint 系列教程(一) | 待辦事項(xiàng)服務(wù)開(kāi)發(fā)教程

Vert.x Blueprint 系列教程(二) | 開(kāi)發(fā)基于消息的應(yīng)用 - Vert.x Kue 教程

Vert.x Blueprint 系列教程(三) | Micro-Shop 微服務(wù)應(yīng)用實(shí)戰(zhàn)

本系列已發(fā)布至Vert.x官網(wǎng):Vert.x Blueprint Tutorials

前言

歡迎回到Vert.x 藍(lán)圖系列~在本教程中,我們將利用Vert.x開(kāi)發(fā)一個(gè)基于消息的應(yīng)用 - Vert.x Kue,它是一個(gè)使用Vert.x開(kāi)發(fā)的優(yōu)先級(jí)工作隊(duì)列,數(shù)據(jù)存儲(chǔ)使用的是 Redis。Vert.x Kue是 Automattic/kue 的Vert.x實(shí)現(xiàn)版本。我們可以使用Vert.x Kue來(lái)處理各種各樣的任務(wù),比如文件轉(zhuǎn)換、訂單處理等等。

通過(guò)本教程,你將會(huì)學(xué)習(xí)到以下內(nèi)容:

消息、消息系統(tǒng)以及事件驅(qū)動(dòng)的運(yùn)用

Vert.x Event Bus 的幾種事件機(jī)制(發(fā)布/訂閱、點(diǎn)對(duì)點(diǎn)模式)

設(shè)計(jì) 分布式 的Vert.x應(yīng)用

工作隊(duì)列的設(shè)計(jì)

Vert.x Service Proxy(服務(wù)代理,即異步RPC)的運(yùn)用

更深層次的Redis運(yùn)用

本教程是Vert.x 藍(lán)圖系列的第二篇教程,對(duì)應(yīng)的Vert.x版本為3.3.2。本教程中的完整代碼已托管至GitHub。

Vert.x的消息系統(tǒng)

既然我們要用Vert.x開(kāi)發(fā)一個(gè)基于消息的應(yīng)用,那么我們先來(lái)瞅一瞅Vert.x的消息系統(tǒng)吧~在Vert.x中,我們可以通過(guò) Event Bus 來(lái)發(fā)送和接收各種各樣的消息,這些消息可以來(lái)自不同的Vertx實(shí)例。怎么樣,很酷吧?我們都將消息發(fā)送至Event Bus上的某個(gè)地址上,這個(gè)地址可以是任意的字符串。

Event Bus支持三種消息機(jī)制:發(fā)布/訂閱(Publish/Subscribe)、點(diǎn)對(duì)點(diǎn)(Point to point)以及請(qǐng)求/回應(yīng)(Request-Response)模式。下面我們就來(lái)看一看這幾種機(jī)制。

發(fā)布/訂閱模式

發(fā)布/訂閱模式中,消息被發(fā)布到Event Bus的某一個(gè)地址上,所有訂閱此地址的Handler都會(huì)接收到該消息并且調(diào)用相應(yīng)的處理邏輯。我們來(lái)看一看示例代碼:

EventBus eventBus = vertx.eventBus();

eventBus.consumer("foo.bar.baz", r -> { // subscribe to `foo.bar.baz` address
  System.out.println("1: " + r.body());
});
eventBus.consumer("foo.bar.baz", r -> { // subscribe to `foo.bar.baz` address
  System.out.println("2: " + r.body());
});

eventBus.publish("foo.bar.baz", "+1s"); // 向此地址發(fā)送消息

我們可以通過(guò)vertx.eventBus()方法獲取EventBus的引用,然后我們就可以通過(guò)consume方法訂閱某個(gè)地址的消息并且綁定一個(gè)Handler。接著我們通過(guò)publish向此地址發(fā)送消息。如果運(yùn)行上面的例子,我們會(huì)得到一下結(jié)果:

2: +1s
1: +1s
點(diǎn)對(duì)點(diǎn)模式

如果我們把上面的示例中的publish方法替代成send方法,上面的實(shí)例就變成點(diǎn)對(duì)點(diǎn)模式了。在點(diǎn)對(duì)點(diǎn)模式中,消息被發(fā)布到Event Bus的某一個(gè)地址上。Vert.x會(huì)將此消息傳遞給其中監(jiān)聽(tīng)此地址的Handler之一。如果有多個(gè)Handler綁定到此地址,那么就使用輪詢(xún)算法隨機(jī)挑一個(gè)Handler傳遞消息。比如在此示例中,程序只會(huì)打印2: +1s或者1: +1s之中的一個(gè)。

請(qǐng)求/回應(yīng)模式

當(dāng)我們綁定的Handler接收到消息的時(shí)候,我們可不可以給消息的發(fā)送者回復(fù)呢?當(dāng)然了!當(dāng)我們通過(guò)send方法發(fā)送消息的時(shí)候,我們可以同時(shí)指定一個(gè)回復(fù)處理函數(shù)(reply handler)。然后當(dāng)某個(gè)消息的訂閱者接收到消息的時(shí)候,它就可以給發(fā)送者回復(fù)消息;如果發(fā)送者接收到了回復(fù),發(fā)送者綁定的回復(fù)處理函數(shù)就會(huì)被調(diào)用。這就是請(qǐng)求/回應(yīng)模式

好啦,現(xiàn)在我們已經(jīng)粗略了解了Vert.x中的消息系統(tǒng) - Event Bus的基本使用,下面我們就看看Vert.x Kue的基本設(shè)計(jì)。有關(guān)更多關(guān)于Event Bus的信息請(qǐng)參考Vert.x Core Manual - Event Bus。

Vert.x Kue 架構(gòu)設(shè)計(jì) Vert.x Kue 組件劃分

在我們的項(xiàng)目中,我們將Vert.x Kue劃分為兩個(gè)模塊:

kue-core: 核心組件,提供優(yōu)先級(jí)隊(duì)列的功能

kue-http: Web組件,提供Web UI以及REST API

另外我們還提供一個(gè)示例模塊kue-example用于演示以及闡述如何使用Vert.x Kue。

既然我們的項(xiàng)目有兩個(gè)模塊,那么你一定會(huì)好奇:兩個(gè)模塊之間是如何進(jìn)行通信的?并且如果我們寫(xiě)自己的Kue應(yīng)用的話,我們?cè)撛鯓尤フ{(diào)用Kue Core中的服務(wù)呢?不要著急,謎底將在后邊的章節(jié)中揭曉:-)

Vert.x Kue 核心模塊

回顧一下Vert.x Kue的作用 - 優(yōu)先級(jí)工作隊(duì)列,所以在Vert.x Kue的核心模塊中我們?cè)O(shè)計(jì)了以下的類(lèi):

Job - 任務(wù)(作業(yè))數(shù)據(jù)實(shí)體

JobService - 異步服務(wù)接口,提供操作任務(wù)以及獲取數(shù)據(jù)的相關(guān)邏輯

KueWorker - 用于處理任務(wù)的Verticle

Kue - 工作隊(duì)列

前邊我們提到過(guò),我們的兩個(gè)組件之間需要一種通信機(jī)制可以互相通信 - 這里我們使用Vert.x的集群模式,即以clustered的模式來(lái)部署Verticle。這樣的環(huán)境下的Event Bus同樣也是集群模式的,因此各個(gè)組件可以通過(guò)集群模式下的Event Bus進(jìn)行通信。很不錯(cuò)吧?在Vert.x的集群模式下,我們需要指定一個(gè)集群管理器ClusterManager。這里我們使用默認(rèn)的HazelcastClusterManager,使用 Hazelcast 作為集群管理。

在Vert.x Kue中,我們將JobService服務(wù)發(fā)布至分布式的Event Bus上,這樣其它的組件就可以通過(guò)Event Bus調(diào)用該服務(wù)了。我們?cè)O(shè)計(jì)了一個(gè)KueVerticle用于注冊(cè)服務(wù)。Vert.x提供了Vert.x Service Proxy(服務(wù)代理組件),可以很方便地將服務(wù)注冊(cè)至Event Bus上,然后在其它地方獲取此服務(wù)的代理并調(diào)用。我們將在下面的章節(jié)中詳細(xì)介紹Vert.x Service Proxy。

基于Future的異步模式

在我們的Vert.x Kue中,大多數(shù)的異步方法都是基于Future的。如果您看過(guò)藍(lán)圖系列的第一篇文章的話,您一定不會(huì)對(duì)這種模式很陌生。在Vert.x 3.3中,我們的Future支持基本的響應(yīng)式的操作,比如mapcompose。它們用起來(lái)非常方便,因?yàn)槲覀兛梢詫⒍鄠€(gè)Future以響應(yīng)式的方式組合起來(lái)而不用擔(dān)心陷入回調(diào)地獄中。

Vert.x Kue中的事件

正如我們?cè)赩ert.x Kue 特性介紹中提到的那樣,Vert.x Kue支持兩種級(jí)別的事件:任務(wù)事件(job events) 以及 隊(duì)列事件(queue events)。在Vert.x Kue中,我們?cè)O(shè)計(jì)了三種事件地址:

vertx.kue.handler.job.{handlerType}.{addressId}.{jobType}: 某個(gè)特定任務(wù)的任務(wù)事件地址

vertx.kue.handler.workers.{eventType}: (全局)隊(duì)列事件地址

vertx.kue.handler.workers.{eventType}.{addressId}: 某個(gè)特定任務(wù)的內(nèi)部事件地址

在特性介紹文檔中,我們提到了以下幾種任務(wù)事件:

start 開(kāi)始處理一個(gè)任務(wù) (onStart)

promotion 一個(gè)延期的任務(wù)時(shí)間已到,提升至工作隊(duì)列中 (onPromotion)

progress 任務(wù)的進(jìn)度變化 (onProgress)

failed_attempt 任務(wù)處理失敗,但是還可以重試 (onFailureAttempt)

failed 任務(wù)處理失敗并且不能重試 (onFailure)

complete 任務(wù)完成 (onComplete)

remove 任務(wù)從后端存儲(chǔ)中移除 (onRemove)

隊(duì)列事件也相似,只不過(guò)需要加前綴job_。這些事件都會(huì)通過(guò)send方法發(fā)送至Event Bus上。每一個(gè)任務(wù)都有對(duì)應(yīng)的任務(wù)事件地址,因此它們能夠正確地接收到對(duì)應(yīng)的事件并進(jìn)行相應(yīng)的處理邏輯。

特別地,我們還有兩個(gè)內(nèi)部事件:donedone_fail。done事件對(duì)應(yīng)一個(gè)任務(wù)在底層的處理已經(jīng)完成,而done_fail事件對(duì)應(yīng)一個(gè)任務(wù)在底層的處理失敗。這兩個(gè)事件使用第三種地址進(jìn)行傳遞。

任務(wù)狀態(tài)

在Vert.x Kue中,任務(wù)共有五種狀態(tài):

INACTIVE: 任務(wù)還未開(kāi)始處理,在工作隊(duì)列中等待處理

ACTIVE: 任務(wù)正在處理中

COMPLETE: 任務(wù)處理完成

FAILED: 任務(wù)處理失敗

DELAYED: 任務(wù)延時(shí)處理,正在等待計(jì)時(shí)器時(shí)間到并提升至工作隊(duì)列中

我們使用狀態(tài)圖來(lái)描述任務(wù)狀態(tài)的變化:

以及任務(wù)狀態(tài)的變化伴隨的事件:

整體設(shè)計(jì)

為了讓大家對(duì)Vert.x Kue的架構(gòu)有大致的了解,我用一幅圖來(lái)簡(jiǎn)略描述整個(gè)Vert.x Kue的設(shè)計(jì):

現(xiàn)在我們對(duì)Vert.x Kue的設(shè)計(jì)有了大致的了解了,下面我們就來(lái)看一看Vert.x Kue的代碼實(shí)現(xiàn)了~

項(xiàng)目結(jié)構(gòu)

我們來(lái)開(kāi)始探索Vert.x Kue的旅程吧!首先我們先從GitHub上clone源代碼:

git clone https://github.com/sczyh30/vertx-blueprint-job-queue.git

然后你可以把項(xiàng)目作為Gradle項(xiàng)目導(dǎo)入你的IDE中。(如何導(dǎo)入請(qǐng)參考相關(guān)IDE幫助文檔)

正如我們之前所提到的,我們的Vert.x Kue中有兩個(gè)功能模塊和一個(gè)實(shí)例模塊,因此我們需要在Gradle工程文件中定義三個(gè)子工程。我們來(lái)看一下本項(xiàng)目中的build.gradle文件:

configure(allprojects) { project ->

  ext {
    vertxVersion = "3.3.2"
  }

  apply plugin: "java"

  repositories {
    jcenter()
  }

  dependencies {
    compile("io.vertx:vertx-core:${vertxVersion}")
    compile("io.vertx:vertx-codegen:${vertxVersion}")
    compile("io.vertx:vertx-rx-java:${vertxVersion}")
    compile("io.vertx:vertx-hazelcast:${vertxVersion}")
    compile("io.vertx:vertx-lang-ruby:${vertxVersion}")

    testCompile("io.vertx:vertx-unit:${vertxVersion}")
    testCompile group: "junit", name: "junit", version: "4.12"
  }

  sourceSets {
    main {
      java {
        srcDirs += "src/main/generated"
      }
    }
  }

  compileJava {
    targetCompatibility = 1.8
    sourceCompatibility = 1.8
  }
}

project("kue-core") {

  dependencies {
    compile("io.vertx:vertx-redis-client:${vertxVersion}")
    compile("io.vertx:vertx-service-proxy:${vertxVersion}")
  }

  jar {
    archiveName = "vertx-blueprint-kue-core.jar"
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    manifest {
      attributes "Main-Class": "io.vertx.core.Launcher"
      attributes "Main-Verticle": "io.vertx.blueprint.kue.queue.KueVerticle"
    }
  }

  task annotationProcessing(type: JavaCompile, group: "build") { // codegen
    source = sourceSets.main.java
    classpath = configurations.compile
    destinationDir = project.file("src/main/generated")
    options.compilerArgs = [
      "-proc:only",
      "-processor", "io.vertx.codegen.CodeGenProcessor",
      "-AoutputDirectory=${project.projectDir}/src/main"
    ]
  }

  compileJava {
    targetCompatibility = 1.8
    sourceCompatibility = 1.8

    dependsOn annotationProcessing
  }
}

project("kue-http") {

  dependencies {
    compile(project(":kue-core"))
    compile("io.vertx:vertx-web:${vertxVersion}")
    compile("io.vertx:vertx-web-templ-jade:${vertxVersion}")
  }

  jar {
    archiveName = "vertx-blueprint-kue-http.jar"
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    manifest {
      attributes "Main-Class": "io.vertx.core.Launcher"
      attributes "Main-Verticle": "io.vertx.blueprint.kue.http.KueHttpVerticle"
    }
  }
}

project("kue-example") {

  dependencies {
    compile(project(":kue-core"))
  }

  jar {
    archiveName = "vertx-blueprint-kue-example.jar"
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    manifest {
      attributes "Main-Class": "io.vertx.core.Launcher"
      attributes "Main-Verticle": "io.vertx.blueprint.kue.example.LearningVertxVerticle"
    }
  }
}

task wrapper(type: Wrapper) {
  gradleVersion = "2.12"
}

(⊙o⊙)…比之前的待辦事項(xiàng)服務(wù)項(xiàng)目中的長(zhǎng)不少誒。。。我們來(lái)解釋一下:

configure(allprojects)作用域中,我們配置了一些全局信息(對(duì)所有子工程都適用)。

我們定義了三個(gè)子工程:kue-core、kue-http以及kue-example。這里我們來(lái)解釋一下里面用到的依賴(lài)。在kue-core中,vertx-redis-client用于Redis通信,vertx-service-proxy用于Event Bus上的服務(wù)代理。在kue-http中,我們將kue-core子工程作為它的一個(gè)依賴(lài)。vertx-webvertx-web-templ-jade用于Kue Web端的開(kāi)發(fā)。

任務(wù)annotationProcessing用于注解處理(Vert.x Codegen)。我們已經(jīng)在上一篇教程中介紹過(guò)了,這里就不展開(kāi)講了。

我們還需要在 settings.gradle 中配置工程:

rootProject.name = "vertx-blueprint-job-queue"

include "kue-core"
include "kue-http"
include "kue-example"

看完了配置文件以后,我們?cè)賮?lái)瀏覽一下我們的項(xiàng)目目錄結(jié)構(gòu):

.
├── build.gradle
├── kue-core
│?? └── src
│??     ├── main
│??     │?? ├── java
│??     │?? └── resources
│??     └── test
│??         ├── java
│??         └── resources
├── kue-example
│?? └── src
│??     ├── main
│??     │?? ├── java
│??     │?? └── resources
│??     └── test
│??         ├── java
│??         └── resources
├── kue-http
│?? └── src
│??     ├── main
│??     │?? ├── java
│??     │?? └── resources
│??     └── test
│??         ├── java
│??         └── resources
└── settings.gradle

在Gradle中,項(xiàng)目的源碼都位于{projectName}/src/main/java目錄內(nèi)。這篇教程是圍繞Vert.x Kue Core的,所以我們的代碼都在kue-core目錄中。

好啦!現(xiàn)在我們已經(jīng)對(duì)Vert.x Kue項(xiàng)目的整體結(jié)構(gòu)有了大致的了解了,下面我們開(kāi)始源碼探索之旅!

任務(wù)實(shí)體 - 不僅僅是一個(gè)數(shù)據(jù)對(duì)象

Vert.x Kue是用來(lái)處理任務(wù)的,因此我們先來(lái)看一下代表任務(wù)實(shí)體的Job類(lèi)。Job類(lèi)位于io.vertx.blueprint.kue.queue包下。代碼可能有點(diǎn)長(zhǎng),不要擔(dān)心,我們把它分成幾部分,分別來(lái)解析。

任務(wù)成員屬性

我們先來(lái)看一下Job類(lèi)中的成員屬性:

@DataObject(generateConverter = true)
public class Job {
    // job properties

    private final String address_id;
    private long id = -1;
    private String zid;
    private String type;
    private JsonObject data;
    private Priority priority = Priority.NORMAL;
    private JobState state = JobState.INACTIVE;
    private long delay = 0;
    private int max_attempts = 1;
    private boolean removeOnComplete = false;
    private int ttl = 0;
    private JsonObject backoff;

    private int attempts = 0;
    private int progress = 0;
    private JsonObject result;

    // job metrics
    private long created_at;
    private long promote_at;
    private long updated_at;
    private long failed_at;
    private long started_at;
    private long duration;


    // ...
}

我去。。。好多屬性!我們一個(gè)一個(gè)地解釋?zhuān)?/p>

address_id: 一個(gè)UUID序列,作為Event Bus的地址

id: 任務(wù)的編號(hào)(id)

type: 任務(wù)的類(lèi)型

data: 任務(wù)攜帶的數(shù)據(jù),以 JsonObject 類(lèi)型表示

priority: 任務(wù)優(yōu)先級(jí),以 Priority 枚舉類(lèi)型表示。默認(rèn)優(yōu)先級(jí)為正常(NORMAL)

delay: 任務(wù)的延遲時(shí)間,默認(rèn)是 0

state: 任務(wù)狀態(tài),以 JobState 枚舉類(lèi)型表示。默認(rèn)狀態(tài)為等待(INACTIVE)

attempts: 任務(wù)已經(jīng)嘗試執(zhí)行的次數(shù)

max_attempts: 任務(wù)嘗試執(zhí)行次數(shù)的最大閾值

removeOnComplete: 代表任務(wù)完成時(shí)是否自動(dòng)從后臺(tái)移除

zid: zset操作對(duì)應(yīng)的編號(hào)(zid),保持先進(jìn)先出順序

ttl: TTL(Time to live)

backoff: 任務(wù)重試配置,以 JsonObject 類(lèi)型表示

progress: 任務(wù)執(zhí)行的進(jìn)度

result: 任務(wù)執(zhí)行的結(jié)果,以 JsonObject 類(lèi)型表示

還有這些統(tǒng)計(jì)數(shù)據(jù):

created_at: 代表此任務(wù)創(chuàng)建的時(shí)間

promote_at: 代表此任務(wù)從延時(shí)狀態(tài)被提升至等待狀態(tài)時(shí)的時(shí)間

updated_at: 代表任務(wù)更新的時(shí)間

failed_at: 代表任務(wù)失敗的時(shí)間

started_at: 代表任務(wù)開(kāi)始的時(shí)間

duration: 代表處理任務(wù)花費(fèi)的時(shí)間,單位為毫秒(ms)

你可能注意到在 Job 類(lèi)中還存在著幾個(gè)靜態(tài)成員變量:

private static Logger logger = LoggerFactory.getLogger(Job.class);

private static Vertx vertx;
private static RedisClient client;
private static EventBus eventBus;

public static void setVertx(Vertx v, RedisClient redisClient) {
  vertx = v;
  client = redisClient;
  eventBus = vertx.eventBus();
}

對(duì)于 logger 對(duì)象,我想大家應(yīng)該都很熟悉,它代表一個(gè)Vert.x Logger實(shí)例用于日志記錄。但是你一定想問(wèn)為什么 Job 類(lèi)中存在著一個(gè)Vertx類(lèi)型的靜態(tài)成員。Job類(lèi)不應(yīng)該是一個(gè)數(shù)據(jù)對(duì)象嗎?當(dāng)然咯!Job類(lèi)代表一個(gè)數(shù)據(jù)對(duì)象,但不僅僅是一個(gè)數(shù)據(jù)對(duì)象。這里我模仿了一些Automattic/kue的風(fēng)格,把一些任務(wù)相關(guān)邏輯方法放到了Job類(lèi)里,它們大多都是基于Future的異步方法,因此可以很方便地去調(diào)用以及進(jìn)行組合變換。比如:

job.save()
    .compose(Job::updateNow)
    .compose(j -> j.log("good!"));

由于我們不能在Job類(lèi)被JVM加載的時(shí)候就獲取Vertx實(shí)例,我們必須手動(dòng)給Job類(lèi)中的靜態(tài)Vertx成員賦值。這里我們是在Kue類(lèi)中對(duì)其進(jìn)行賦值的。當(dāng)我們創(chuàng)建一個(gè)工作隊(duì)列的時(shí)候,Job類(lèi)中的靜態(tài)成員變量會(huì)被初始化。同時(shí)為了保證程序的正確性,我們需要一個(gè)方法來(lái)檢測(cè)靜態(tài)成員變量是否初始化。當(dāng)我們?cè)趧?chuàng)建一個(gè)任務(wù)的時(shí)候,如果靜態(tài)成員此時(shí)未被初始化,那么日志會(huì)給出警告:

private void _checkStatic() {
  if (vertx == null) {
    logger.warn("static Vertx instance in Job class is not initialized!");
  }
}

我們還注意到 Job 類(lèi)也是由@DataObject注解修飾的。Vert.x Codegen可以處理含有@DataObject注解的類(lèi)并生成對(duì)應(yīng)的JSON轉(zhuǎn)換器,并且Vert.x Service Proxy也需要數(shù)據(jù)對(duì)象。

Job類(lèi)中我們有四個(gè)構(gòu)造函數(shù)。其中address_id成員必須在一個(gè)任務(wù)被創(chuàng)建時(shí)就被賦值,默認(rèn)情況下此地址用一個(gè)唯一的UUID字符串表示。每一個(gè)構(gòu)造函數(shù)中我們都要調(diào)用_checkStatic函數(shù)來(lái)檢測(cè)靜態(tài)成員變量是否被初始化。

任務(wù)事件輔助函數(shù)

正如我們之前所提到的那樣,我們通過(guò)一個(gè)特定的地址vertx.kue.handler.job.{handlerType}.{addressId}.{jobType}在分布式的Event Bus上發(fā)送和接收任務(wù)事件(job events)。所以我們提供了兩個(gè)用于發(fā)送和接收事件的輔助函數(shù)emiton(類(lèi)似于Node.js中的EventEmitter):

@Fluent
public  Job on(String event, Handler> handler) {
  logger.debug("[LOG] On: " + Kue.getCertainJobAddress(event, this));
  eventBus.consumer(Kue.getCertainJobAddress(event, this), handler);
  return this;
}

@Fluent
public Job emit(String event, Object msg) {
  logger.debug("[LOG] Emit: " + Kue.getCertainJobAddress(event, this));
  eventBus.send(Kue.getCertainJobAddress(event, this), msg);
  return this;
}

在后面的代碼中,我們將頻繁使用這兩個(gè)輔助函數(shù)。

Redis中的存儲(chǔ)形式

在我們探索相關(guān)的邏輯函數(shù)之前,我們先來(lái)描述一下Vert.x Kue的數(shù)據(jù)在Redis中是以什么樣的形式存儲(chǔ)的:

所有的key都在vertx_kue命名空間下(以vertx_kue:作為前綴)

vertx:kue:job:{id}: 存儲(chǔ)任務(wù)實(shí)體的map

vertx:kue:ids: 計(jì)數(shù)器,指示當(dāng)前最大的任務(wù)ID

vertx:kue:job:types: 存儲(chǔ)所有任務(wù)類(lèi)型的列表

vertx:kue:{type}:jobs: 指示所有等待狀態(tài)下的某種類(lèi)型任務(wù)的列表

vertx_kue:jobs: 存儲(chǔ)所有任務(wù)zid的有序集合

vertx_kue:job:{state}: 存儲(chǔ)所有指定狀態(tài)的任務(wù)zid的有序集合

vertx_kue:jobs:{type}:{state}: 存儲(chǔ)所有指定狀態(tài)和類(lèi)型的任務(wù)zid的有序集合

vertx:kue:job:{id}:log: 存儲(chǔ)指定id的任務(wù)對(duì)應(yīng)日志的列表

OK,下面我們就來(lái)看看Job類(lèi)中重要的邏輯函數(shù)。

改變?nèi)蝿?wù)狀態(tài)

我們之前提到過(guò),Vert.x Kue中的任務(wù)一共有五種狀態(tài)。所有的任務(wù)相關(guān)的操作都伴隨著任務(wù)狀態(tài)的變換,因此我們先來(lái)看一下state方法的實(shí)現(xiàn),它用于改變?nèi)蝿?wù)的狀態(tài):

public Future state(JobState newState) {
  Future future = Future.future();
  RedisClient client = RedisHelper.client(vertx, new JsonObject()); // use a new client to keep transaction
  JobState oldState = this.state;
  client.transaction().multi(r0 -> { // (1)
    if (r0.succeeded()) {
      if (oldState != null && !oldState.equals(newState)) { // (2)
        client.transaction().zrem(RedisHelper.getStateKey(oldState), this.zid, _failure())
          .zrem(RedisHelper.getKey("jobs:" + this.type + ":" + oldState.name()), this.zid, _failure());
      }
      client.transaction().hset(RedisHelper.getKey("job:" + this.id), "state", newState.name(), _failure()) // (3)
        .zadd(RedisHelper.getKey("jobs:" + newState.name()), this.priority.getValue(), this.zid, _failure())
        .zadd(RedisHelper.getKey("jobs:" + this.type + ":" + newState.name()), this.priority.getValue(), this.zid, _failure());

      switch (newState) { // dispatch different state
        case ACTIVE: // (4)
          client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()),
            this.priority.getValue() < 0 ? this.priority.getValue() : -this.priority.getValue(),
            this.zid, _failure());
          break;
        case DELAYED: // (5)
          client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()),
            this.promote_at, this.zid, _failure());
          break;
        case INACTIVE: // (6)
          client.transaction().lpush(RedisHelper.getKey(this.type + ":jobs"), "1", _failure());
          break;
        default:
      }

      this.state = newState;

      client.transaction().exec(r -> { // (7)
        if (r.succeeded()) {
          future.complete(this);
        } else {
          future.fail(r.cause());
        }
      });
    } else {
      future.fail(r0.cause());
    }
  });

  return future.compose(Job::updateNow);
}

首先我們先創(chuàng)建了一個(gè)Future對(duì)象。然后我們調(diào)用了 client.transaction().multi(handler) 函數(shù)開(kāi)始一次Redis事務(wù) (1)。在Vert.x 3.3.2中,所有的Redis事務(wù)操作都移至RedisTransaction類(lèi)中,所以我們需要先調(diào)用client.transaction()方法去獲取一個(gè)事務(wù)實(shí)例,然后調(diào)用multi代表事務(wù)塊的開(kāi)始。

multi函數(shù)傳入的Handler中,我們先判定當(dāng)前的任務(wù)狀態(tài)。如果當(dāng)前任務(wù)狀態(tài)不為空并且不等于新的任務(wù)狀態(tài),我們就將Redis中存儲(chǔ)的舊的狀態(tài)信息移除 (2)。為了方便起見(jiàn),我們提供了一個(gè)RedisHelper輔助類(lèi),里面提供了一些生成特定地址以及編碼解碼zid的方法:

package io.vertx.blueprint.kue.util;

import io.vertx.blueprint.kue.queue.JobState;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.redis.RedisClient;
import io.vertx.redis.RedisOptions;


public final class RedisHelper {

  private static final String VERTX_KUE_REDIS_PREFIX = "vertx_kue";

  private RedisHelper() {
  }

  public static RedisClient client(Vertx vertx, JsonObject config) {
    return RedisClient.create(vertx, options(config));
  }

  public static RedisOptions options(JsonObject config) {
    return new RedisOptions()
      .setHost(config.getString("redis.host", "127.0.0.1"))
      .setPort(config.getInteger("redis.port", 6379));
  }

  public static String getKey(String key) {
    return VERTX_KUE_REDIS_PREFIX + ":" + key;
  }

  public static String getStateKey(JobState state) {
    return VERTX_KUE_REDIS_PREFIX + ":jobs:" + state.name();
  }

  public static String createFIFO(long id) {
    String idLen = "" + ("" + id).length();
    int len = 2 - idLen.length();
    while (len-- > 0)
      idLen = "0" + idLen;
    return idLen + "|" + id;
  }

  public static String stripFIFO(String zid) {
    return zid.substring(zid.indexOf("|") + 1);
  }

  public static long numStripFIFO(String zid) {
    return Long.parseLong(zid.substring(zid.indexOf("|") + 1));
  }
}

所有的key都必須在vertx_kue命名空間下,因此我們封裝了一個(gè)getKey方法。我們還實(shí)現(xiàn)了createFIFOstripFIFO方法用于生成zid以及解碼zidzid的格式使用了Automattic/Kue中的格式。

回到state方法來(lái)。我們使用zrem(String key, String member, Handler> handler)方法將特定的數(shù)據(jù)從有序集合中移除。兩個(gè)key分別是vertx_kue:job:{state} 以及 vertx_kue:jobs:{type}:{state}member對(duì)應(yīng)著任務(wù)的zid。

接下來(lái)我們使用hset方法來(lái)變更新的狀態(tài) (3),然后用zadd方法往vertx_kue:job:{state}vertx_kue:jobs:{type}:{state}兩個(gè)有序集合中添加此任務(wù)的zid,同時(shí)傳遞一個(gè)權(quán)重(score)。這個(gè)非常重要,我們就是通過(guò)這個(gè)實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列的。我們直接使用priority對(duì)應(yīng)的值作為score。這樣,當(dāng)我們需要從Redis中獲取任務(wù)的時(shí)候,我們就可以通過(guò)zpop方法獲取優(yōu)先級(jí)最高的任務(wù)。我們會(huì)在后面詳細(xì)講述。

不同的新?tīng)顟B(tài)需要不同的操作。對(duì)于ACTIVE狀態(tài),我們通過(guò)zadd命令將zid添加至vertx_kue:jobs:ACTIVE有序集合中并賦予優(yōu)先級(jí)權(quán)值 (4)。對(duì)于DELAYED狀態(tài),我們通過(guò)zadd命令將zid添加至vertx_kue:jobs:DELAYED有序集合中并賦予提升時(shí)間(promote_at)權(quán)值 (5)。對(duì)于INACTIVE狀態(tài),我們向vertx:kue:{type}:jobs列表中添加一個(gè)元素 (6)。這些操作都是在Redis事務(wù)塊內(nèi)完成的。最后我們通過(guò)exec方法一并執(zhí)行這些事務(wù)操作 (7)。如果執(zhí)行成功,我們給future賦值(當(dāng)前任務(wù))。最后我們返回future并且與updateNow方法相組合。

updateNow方法非常簡(jiǎn)單,就是把updated_at的值設(shè)為當(dāng)前時(shí)間,然后存到Redis中:

Future updateNow() {
  this.updated_at = System.currentTimeMillis();
  return this.set("updated_at", String.valueOf(updated_at));
}
保存任務(wù)

這里我們來(lái)看一下整個(gè)Job類(lèi)中最重要的方法之一 - save方法,它的作用是保存任務(wù)至Redis中。

public Future save() {
  // check
  Objects.requireNonNull(this.type, "Job type cannot be null"); // (1)

  if (this.id > 0)
    return update(); // (2)

  Future future = Future.future();

  // 生成id
  client.incr(RedisHelper.getKey("ids"), res -> { // (3)
    if (res.succeeded()) {
      this.id = res.result();
      this.zid = RedisHelper.createFIFO(id); // (4)
      String key = RedisHelper.getKey("job:" + this.id);

      if (this.delay > 0) {
        this.state = JobState.DELAYED;
      }

      client.sadd(RedisHelper.getKey("job:types"), this.type, _failure()); // (5)
       this.created_at = System.currentTimeMillis();
       this.promote_at = this.created_at + this.delay;
       // 保存任務(wù)
       client.hmset(key, this.toJson(), _completer(future, this)); // (6)
    } else {
      future.fail(res.cause());
    }
  });

  return future.compose(Job::update); // (7)
}

首先,任務(wù)類(lèi)型不能為空所以我們要檢查type是否為空 (1)。接著,如果當(dāng)前任務(wù)的id大于0,則代表此任務(wù)已經(jīng)存儲(chǔ)過(guò)(因?yàn)閕d是存儲(chǔ)時(shí)分配),此時(shí)只需執(zhí)行更新操作(update)即可 (2)。然后我們創(chuàng)建一個(gè)Future對(duì)象,然后使用incr方法從vertx_kue:ids字段獲取一個(gè)新的id (3)。同時(shí)我們使用RedisHelper.createFIFO(id)方法來(lái)生成新的zid (4)。接著我們來(lái)判斷任務(wù)延時(shí)是否大于0,若大于0則將當(dāng)前任務(wù)狀態(tài)設(shè)置為DELAYED。然后我們通過(guò)sadd方法將當(dāng)前任務(wù)類(lèi)型添加至vertx:kue:job:types列表中 (5) 并且保存任務(wù)創(chuàng)建時(shí)間(created_at)以及任務(wù)提升時(shí)間(promote_at)。經(jīng)過(guò)這一系列的操作后,所有的屬性都已準(zhǔn)備好,所以我們可以利用hmset方法將此任務(wù)實(shí)體存儲(chǔ)至vertx:kue:job:{id}哈希表中 (6)。如果存儲(chǔ)操作成功,那么將當(dāng)前任務(wù)實(shí)體賦給future,否則記錄錯(cuò)誤。最后我們返回此future并且將其與update方法進(jìn)行組合。

update方法進(jìn)行一些更新操作,它的邏輯比較簡(jiǎn)單:

Future update() {
  Future future = Future.future();
  this.updated_at = System.currentTimeMillis();

  client.transaction().multi(_failure())
    .hset(RedisHelper.getKey("job:" + this.id), "updated_at", String.valueOf(this.updated_at), _failure())
    .zadd(RedisHelper.getKey("jobs"), this.priority.getValue(), this.zid, _failure())
    .exec(_completer(future, this));

  return future.compose(r ->
    this.state(this.state));
}

可以看到update方法只做了三件微小的工作:存儲(chǔ)任務(wù)更新時(shí)間、存儲(chǔ)zid以及更改當(dāng)前任務(wù)狀態(tài)(組合state方法)。

最后總結(jié)一下將一個(gè)任務(wù)存儲(chǔ)到Redis中經(jīng)過(guò)的步驟:save -> update -> state :-)

移除任務(wù)

移除任務(wù)非常簡(jiǎn)單,借助zremdel方法即可。我們來(lái)看一下其實(shí)現(xiàn):

public Future remove() {
  Future future = Future.future();
  client.transaction().multi(_failure())
    .zrem(RedisHelper.getKey("jobs:" + this.stateName()), this.zid, _failure())
    .zrem(RedisHelper.getKey("jobs:" + this.type + ":" + this.stateName()), this.zid, _failure())
    .zrem(RedisHelper.getKey("jobs"), this.zid, _failure())
    .del(RedisHelper.getKey("job:" + this.id + ":log"), _failure())
    .del(RedisHelper.getKey("job:" + this.id), _failure())
    .exec(r -> {
      if (r.succeeded()) {
        this.emit("remove", new JsonObject().put("id", this.id));
        future.complete();
      } else {
        future.fail(r.cause());
      }
    });
  return future;
}

注意到成功移除任務(wù)時(shí),我們會(huì)向Event Bus上的特定地址發(fā)送remove任務(wù)事件。此事件包含著被移除任務(wù)的id。

監(jiān)聽(tīng)任務(wù)事件

我們可以通過(guò)幾種 onXXX 方法來(lái)監(jiān)聽(tīng)任務(wù)事件:

@Fluent
public Job onComplete(Handler completeHandler) {
  this.on("complete", message -> {
    completeHandler.handle(new Job((JsonObject) message.body()));
  });
  return this;
}

@Fluent
public Job onFailure(Handler failureHandler) {
  this.on("failed", message -> {
    failureHandler.handle((JsonObject) message.body());
  });
  return this;
}

@Fluent
public Job onFailureAttempt(Handler failureHandler) {
  this.on("failed_attempt", message -> {
    failureHandler.handle((JsonObject) message.body());
  });
  return this;
}

@Fluent
public Job onPromotion(Handler handler) {
  this.on("promotion", message -> {
    handler.handle(new Job((JsonObject) message.body()));
  });
  return this;
}

@Fluent
public Job onStart(Handler handler) {
  this.on("start", message -> {
    handler.handle(new Job((JsonObject) message.body()));
  });
  return this;
}

@Fluent
public Job onRemove(Handler removeHandler) {
  this.on("start", message -> {
    removeHandler.handle((JsonObject) message.body());
  });
  return this;
}

@Fluent
public Job onProgress(Handler progressHandler) {
  this.on("progress", message -> {
    progressHandler.handle((Integer) message.body());
  });
  return this;
}

注意到不同的事件,對(duì)應(yīng)接收的數(shù)據(jù)類(lèi)型也有差異。我們來(lái)說(shuō)明一下:

onCompleteonPromotion 以及 onStart: 發(fā)送的數(shù)據(jù)是對(duì)應(yīng)的Job對(duì)象

onFailure and onFailureAttempt: 發(fā)送的數(shù)據(jù)是JsonObject類(lèi)型的,其格式類(lèi)似于:

{
    "job": {},
    "extra": {
        "message": "some_error"
    }
}

onProgress: 發(fā)送的數(shù)據(jù)是當(dāng)前任務(wù)進(jìn)度

onRemove: 發(fā)送的數(shù)據(jù)是JsonObject類(lèi)型的,其中id代表被移除任務(wù)的編號(hào)

更新任務(wù)進(jìn)度

我們可以通過(guò)progress方法來(lái)更新任務(wù)進(jìn)度??匆幌缕鋵?shí)現(xiàn):

public Future progress(int complete, int total) {
  int n = Math.min(100, complete * 100 / total); // (1)
  this.emit("progress", n); // (2)
  return this.setProgress(n) // (3)
    .set("progress", String.valueOf(n))
    .compose(Job::updateNow);
}

progress方法接受兩個(gè)參數(shù):第一個(gè)是當(dāng)前完成的進(jìn)度值,第二個(gè)是完成狀態(tài)需要的進(jìn)度值。我們首先計(jì)算出當(dāng)前的進(jìn)度 (1),然后向特定地址發(fā)送progress事件 (2)。最后我們將進(jìn)度存儲(chǔ)至Redis中并更新時(shí)間,返回Future (3)。

任務(wù)失敗以及重試機(jī)制

當(dāng)一個(gè)任務(wù)處理失敗時(shí),如果它有剩余的重試次數(shù),Vert.x Kue會(huì)自動(dòng)調(diào)用failAttempt方法進(jìn)行重試。我們來(lái)看一下failAttempt方法的實(shí)現(xiàn):

Future failedAttempt(Throwable err) {
  return this.error(err)
    .compose(Job::failed)
    .compose(Job::attemptInternal);
}

(⊙o⊙)非常簡(jiǎn)短吧~實(shí)際上,failAttempt方法是三個(gè)異步方法的組合:error、failed以及attemptInternal。當(dāng)一個(gè)任務(wù)需要進(jìn)行重試的時(shí)候,我們首先向Event Bus發(fā)布 error 隊(duì)列事件并且在Redis中記錄日志,然后將當(dāng)前的任務(wù)狀態(tài)置為FAILED,最后重新處理此任務(wù)。

我們先來(lái)看一下error方法:

public Future error(Throwable ex) {
  return this.emitError(ex)
    .set("error", ex.getMessage())
    .compose(j -> j.log("error | " + ex.getMessage()));
}

它的邏輯很簡(jiǎn)單:首先我們向Event Bus發(fā)布 錯(cuò)誤 事件,然后記錄錯(cuò)誤日志即可。這里我們封裝了一個(gè)發(fā)布錯(cuò)誤的函數(shù)emitError

@Fluent
public Job emitError(Throwable ex) {
  JsonObject errorMessage = new JsonObject().put("id", this.id)
    .put("message", ex.getMessage());
  eventBus.publish(Kue.workerAddress("error"), errorMessage);
  eventBus.send(Kue.getCertainJobAddress("error", this), errorMessage);
  return this;
}

其中發(fā)送的錯(cuò)誤信息格式類(lèi)似于下面的樣子:

{
    "id": 2052,
    "message": "some error"
}

接下來(lái)我們?cè)賮?lái)看一下failed方法的實(shí)現(xiàn):

public Future failed() {
  this.failed_at = System.currentTimeMillis();
  return this.updateNow()
    .compose(j -> j.set("failed_at", String.valueOf(j.failed_at)))
    .compose(j -> j.state(JobState.FAILED));
}

非常簡(jiǎn)單,首先我們更新任務(wù)的更新時(shí)間和失敗時(shí)間,然后通過(guò)state方法將當(dāng)前任務(wù)狀態(tài)置為FAILED即可。

任務(wù)重試的核心邏輯在attemptInternal方法中:

private Future attemptInternal() {
  int remaining = this.max_attempts - this.attempts; // (1)
  if (remaining > 0) { // 還有重試次數(shù)
    return this.attemptAdd() // (2)
      .compose(Job::reattempt) // (3)
      .setHandler(r -> {
        if (r.failed()) {
          this.emitError(r.cause()); // (4)
        }
      });
  } else if (remaining == 0) { // (5)
    return Future.failedFuture("No more attempts");
  } else { // (6)
    return Future.failedFuture(new IllegalStateException("Attempts Exceeded"));
  }
}

在我們的Job數(shù)據(jù)對(duì)象中,我們存儲(chǔ)了最大重試次數(shù)max_attempts以及已經(jīng)重試的次數(shù)attempts,所以我們首先根據(jù)這兩個(gè)數(shù)據(jù)計(jì)算剩余的重試次數(shù)remaining (1)。如果還有剩余次數(shù)的話,我們就先調(diào)用attemptAdd方法增加一次已重試次數(shù)并 (2),然后我們調(diào)用reattempt方法執(zhí)行真正的任務(wù)重試邏輯 (3)。最后返回這兩個(gè)異步方法組合的Future。如果其中一個(gè)過(guò)程出現(xiàn)錯(cuò)誤,我們就發(fā)布error事件 (4)。如果沒(méi)有剩余次數(shù)了或者超出剩余次數(shù)了,我們直接返回錯(cuò)誤。

在我們解析reattempt方法之前,我們先來(lái)回顧一下Vert.x Kue中的任務(wù)失敗恢復(fù)機(jī)制。Vert.x Kue支持延時(shí)重試機(jī)制(retry backoff),并且支持不同的策略(如 fixed 以及 exponential)。之前我們提到Job類(lèi)中有一個(gè)backoff成員變量,它用于配置延時(shí)重試的策略。它的格式類(lèi)似于這樣:

{
    "type": "fixed",
    "delay": 5000
}

延時(shí)重試機(jī)制的實(shí)現(xiàn)在getBackoffImpl方法中,它返回一個(gè)Function對(duì)象,代表一個(gè)接受Integer類(lèi)型(即attempts),返回Long類(lèi)型(代表計(jì)算出的延時(shí)值)的函數(shù):

private Function getBackoffImpl() {
  String type = this.backoff.getString("type", "fixed"); // (1)
  long _delay = this.backoff.getLong("delay", this.delay); // (2)
  switch (type) {
    case "exponential": // (3)
      return attempts -> Math.round(_delay * 0.5 * (Math.pow(2, attempts) - 1));
    case "fixed":
    default: // (4)
      return attempts -> _delay;
  }
}

首先我們從backoff配置中獲取延遲重試策略。目前Vert.x Kue支持兩種策略:fixedexponential。前者采用固定延遲時(shí)間,而后者采用指數(shù)增長(zhǎng)型延遲時(shí)間。默認(rèn)情況下Vert.x Kue會(huì)采用fixed策略 (1)。接下來(lái)我們從backoff配置中獲取延遲時(shí)間,如果配置中沒(méi)有指定,那么就使用任務(wù)對(duì)象中的延遲時(shí)間delay (2)。接下來(lái)就是根據(jù)具體的策略進(jìn)行計(jì)算了。對(duì)于指數(shù)型延遲,我們計(jì)算[delay * 0.5 * 2^attempts]作為延遲時(shí)間 (3);對(duì)于固定型延遲策略,我們直接使用獲取到的延遲時(shí)間 (4)。

好啦,現(xiàn)在回到“真正的重試”方法 —— reattempt方法來(lái):

private Future reattempt() {
  if (this.backoff != null) {
    long delay = this.getBackoffImpl().apply(attempts); // (1)
    return this.setDelay(delay)
      .setPromote_at(System.currentTimeMillis() + delay)
      .update() // (2)
      .compose(Job::delayed); // (3)
  } else {
    return this.inactive(); // (4)
  }
}

首先我們先檢查backoff配置是否存在,若存在則計(jì)算出對(duì)應(yīng)的延時(shí)時(shí)間 (1) 并且設(shè)定delaypromote_at屬性的值然后保存至Redis中 (2)。接著我們通過(guò)delayed方法將任務(wù)的狀態(tài)設(shè)為延時(shí)(DELAYED) (3)。如果延時(shí)重試配置不存在,我們就通過(guò)inactive方法直接將此任務(wù)置入工作隊(duì)列中 (4)。

這就是整個(gè)任務(wù)重試功能的實(shí)現(xiàn),也不是很復(fù)雜蛤?觀察上面的代碼,我們可以發(fā)現(xiàn)Future組合無(wú)處不在。這種響應(yīng)式的組合非常方便。想一想如果我們用回調(diào)的異步方式來(lái)寫(xiě)代碼的話,我們很容易陷入回調(diào)地獄中(⊙o⊙)。。。幾個(gè)回調(diào)嵌套起來(lái)總顯得不是那么優(yōu)美和簡(jiǎn)潔,而用響應(yīng)式的、可組合的Future就可以有效地避免這個(gè)問(wèn)題。

不錯(cuò)!到現(xiàn)在為止我們已經(jīng)探索完Job類(lèi)的源碼了~下面我們來(lái)看一下JobService類(lèi)。

Event Bus 服務(wù) - JobService

在本章節(jié)中我們來(lái)探索一下JobService接口及其實(shí)現(xiàn) —— 它包含著各種普通的操作和統(tǒng)計(jì)Job的邏輯。

異步RPC

我們的JobService是一個(gè)通用邏輯接口,因此我們希望應(yīng)用中的每一個(gè)組件都能訪問(wèn)此服務(wù),即進(jìn)行RPC。在Vert.x中,我們可以將服務(wù)注冊(cè)至Event Bus上,然后其它組件就可以通過(guò)Event Bus來(lái)遠(yuǎn)程調(diào)用注冊(cè)的服務(wù)了。

傳統(tǒng)的RPC有一個(gè)缺點(diǎn):消費(fèi)者需要阻塞等待生產(chǎn)者的回應(yīng)。你可能想說(shuō):這是一種阻塞模型,和Vert.x推崇的異步開(kāi)發(fā)模式不相符。沒(méi)錯(cuò)!而且,傳統(tǒng)的RPC不是真正面向失敗設(shè)計(jì)的。

還好,Vert.x提供了一種高效的、響應(yīng)式的RPC —— 異步RPC。我們不需要等待生產(chǎn)者的回應(yīng),而只需要傳遞一個(gè)Handler>參數(shù)給異步方法。這樣當(dāng)收到生產(chǎn)者結(jié)果時(shí),對(duì)應(yīng)的Handler就會(huì)被調(diào)用,非常方便,這與Vert.x的異步開(kāi)發(fā)模式相符。并且,AsyncResult也是面向失敗設(shè)計(jì)的。

所以講到這里,你可能想問(wèn):到底怎么在Event Bus上注冊(cè)服務(wù)呢?我們是不是需要寫(xiě)一大堆的邏輯去包裝和發(fā)送信息,然后在另一端解碼信息并進(jìn)行調(diào)用呢?不,這太麻煩了!有了Vert.x 服務(wù)代理,我們不需要這么做!Vert.x提供了一個(gè)組件 Vert.x Service Proxy 來(lái)自動(dòng)生成服務(wù)代理。有了它的幫助,我們就只需要按照規(guī)范設(shè)計(jì)我們的異步服務(wù)接口,然后用@ProxyGen注解修飾即可。

@ProxyGen注解的限制
@ProxyGen注解的使用有諸多限制。比如,所有的異步方法都必須是基于回調(diào)的,也就是說(shuō)每個(gè)方法都要接受一個(gè)Handler>類(lèi)型的參數(shù)。并且,類(lèi)型R也是有限制的 —— 只允許基本類(lèi)型以及數(shù)據(jù)對(duì)象類(lèi)型。詳情請(qǐng)參考官方文檔。

異步服務(wù)接口

我們來(lái)看一下JobService的源碼:

@ProxyGen
@VertxGen
public interface JobService {

  static JobService create(Vertx vertx, JsonObject config) {
    return new JobServiceImpl(vertx, config);
  }

  static JobService createProxy(Vertx vertx, String address) {
    return ProxyHelper.createProxy(JobService.class, vertx, address);
  }

  /**
   * 獲取任務(wù),按照優(yōu)先級(jí)順序
   *
   * @param id      job id
   * @param handler async result handler
   */
  @Fluent
  JobService getJob(long id, Handler> handler);

  /**
   * 刪除任務(wù)
   *
   * @param id      job id
   * @param handler async result handler
   */
  @Fluent
  JobService removeJob(long id, Handler> handler);

  /**
   * 判斷任務(wù)是否存在
   *
   * @param id      job id
   * @param handler async result handler
   */
  @Fluent
  JobService existsJob(long id, Handler> handler);

  /**
   * 獲取任務(wù)日志
   *
   * @param id      job id
   * @param handler async result handler
   */
  @Fluent
  JobService getJobLog(long id, Handler> handler);

  /**
   * 獲取某一范圍內(nèi)某個(gè)指定狀態(tài)下的任務(wù)列表
   *
   * @param state   expected job state
   * @param from    from
   * @param to      to
   * @param order   range order
   * @param handler async result handler
   */
  @Fluent
  JobService jobRangeByState(String state, long from, long to, String order, Handler>> handler);

  /**
   * 獲取某一范圍內(nèi)某個(gè)指定狀態(tài)和類(lèi)型下的任務(wù)列表
   *
   * @param type    expected job type
   * @param state   expected job state
   * @param from    from
   * @param to      to
   * @param order   range order
   * @param handler async result handler
   */
  @Fluent
  JobService jobRangeByType(String type, String state, long from, long to, String order, Handler>> handler);

  /**
   * 獲取某一范圍內(nèi)的任務(wù)列表(按照順序或倒序)
   *
   * @param from    from
   * @param to      to
   * @param order   range order
   * @param handler async result handler
   */
  @Fluent
  JobService jobRange(long from, long to, String order, Handler>> handler);

  // 統(tǒng)計(jì)函數(shù)

  /**
   * 獲取指定狀態(tài)和類(lèi)型下的任務(wù)的數(shù)量
   *
   * @param type    job type
   * @param state   job state
   * @param handler async result handler
   */
  @Fluent
  JobService cardByType(String type, JobState state, Handler> handler);

  /**
   * 獲取某個(gè)狀態(tài)下的任務(wù)的數(shù)量
   *
   * @param state   job state
   * @param handler async result handler
   */
  @Fluent
  JobService card(JobState state, Handler> handler);

  /**
   * 獲取COMPLETE狀態(tài)任務(wù)的數(shù)量
   *
   * @param type    job type; if null, then return global metrics
   * @param handler async result handler
   */
  @Fluent
  JobService completeCount(String type, Handler> handler);

  /**
   * 獲取FAILED狀態(tài)任務(wù)的數(shù)量
   *
   * @param type job type; if null, then return global metrics
   */
  @Fluent
  JobService failedCount(String type, Handler> handler);

  /**
   * 獲取INACTIVE狀態(tài)任務(wù)的數(shù)量
   *
   * @param type job type; if null, then return global metrics
   */
  @Fluent
  JobService inactiveCount(String type, Handler> handler);

  /**
   * 獲取ACTIVE狀態(tài)任務(wù)的數(shù)量
   *
   * @param type job type; if null, then return global metrics
   */
  @Fluent
  JobService activeCount(String type, Handler> handler);

  /**
   * 獲取DELAYED狀態(tài)任務(wù)的數(shù)量
   *
   * @param type job type; if null, then return global metrics
   */
  @Fluent
  JobService delayedCount(String type, Handler> handler);

  /**
   * 獲取當(dāng)前存在的所有任務(wù)類(lèi)型
   *
   * @param handler async result handler
   */
  @Fluent
  JobService getAllTypes(Handler>> handler);

  /**
   * 獲取指定狀態(tài)下的所有任務(wù)的ID
   *
   * @param state   job state
   * @param handler async result handler
   */
  @Fluent
  JobService getIdsByState(JobState state, Handler>> handler);

  /**
   * 工作隊(duì)列運(yùn)行時(shí)間(ms)
   *
   * @param handler async result handler
   */
  @Fluent
  JobService getWorkTime(Handler> handler);
}

可以看到我們還為JobService接口添加了@VertxGen注解,Vert.x Codegen可以處理此注解生成多種語(yǔ)言版本的服務(wù)。

JobService接口中我們還定義了兩個(gè)靜態(tài)方法:create用于創(chuàng)建一個(gè)任務(wù)服務(wù)實(shí)例,createProxy用于創(chuàng)建一個(gè)服務(wù)代理。

JobService接口中包含一些任務(wù)操作和統(tǒng)計(jì)的相關(guān)邏輯,每個(gè)方法的功能都已經(jīng)在注釋中闡述了,因此我們就直接來(lái)看它的實(shí)現(xiàn)吧~

任務(wù)服務(wù)的實(shí)現(xiàn)

JobService接口的實(shí)現(xiàn)位于JobServiceImpl類(lèi)中,代碼非常長(zhǎng),因此這里就不貼代碼了。。。大家可以對(duì)照GitHub中的代碼讀下面的內(nèi)容。

getJob: 獲取任務(wù)的方法非常簡(jiǎn)單。直接利用hgetall命令從Redis中取出對(duì)應(yīng)的任務(wù)即可。

removeJob: 我們可以將此方法看作是getJobJob#remove兩個(gè)方法的組合。

existsJob: 使用exists命令判斷對(duì)應(yīng)id的任務(wù)是否存在。

getJobLog: 使用lrange命令從vertx_kue:job:{id}:log列表中取出日志。

rangeGeneral: 使用zrange命令獲取一定范圍內(nèi)的任務(wù),這是一個(gè)通用方法。

zrange 操作
zrange 返回某一有序集合中某個(gè)特定范圍內(nèi)的元素。詳情請(qǐng)見(jiàn)ZRANGE - Redis。

以下三個(gè)方法復(fù)用了rangeGeneral方法:

jobRangeByState: 指定狀態(tài),對(duì)應(yīng)的key為vertx_kue:jobs:{state}。

jobRangeByType: 指定狀態(tài)和類(lèi)型,對(duì)應(yīng)的key為vertx_kue:jobs:{type}:{state}。

jobRange: 對(duì)應(yīng)的key為vertx_kue:jobs。

這兩個(gè)通用方法用于任務(wù)數(shù)量的統(tǒng)計(jì):

cardByType: 利用zcard命令獲取某一指定狀態(tài)和類(lèi)型下任務(wù)的數(shù)量。

card: 利用zcard命令獲取某一指定狀態(tài)下任務(wù)的數(shù)量。

下面五個(gè)輔助統(tǒng)計(jì)方法復(fù)用了上面兩個(gè)通用方法:

completeCount

failedCount

delayedCount

inactiveCount

activeCount

接著看:

getAllTypes: 利用smembers命令獲取vertx_kue:job:types集合中存儲(chǔ)的所有的任務(wù)類(lèi)型。

getIdsByState: 使用zrange獲取某一指定狀態(tài)下所有任務(wù)的ID。

getWorkTime: 使用get命令從vertx_kue:stats:work-time中獲取Vert.x Kue的工作時(shí)間。

注冊(cè)任務(wù)服務(wù)

既然完成了JobService的實(shí)現(xiàn),接下來(lái)我們來(lái)看一下如何利用Service Proxy將服務(wù)注冊(cè)至Event Bus上。這里我們還需要一個(gè)KueVerticle來(lái)創(chuàng)建要注冊(cè)的服務(wù)實(shí)例,并且將其注冊(cè)至Event Bus上。

打開(kāi)io.vertx.blueprint.kue.queue.KueVerticle類(lèi)的源碼:

package io.vertx.blueprint.kue.queue;

import io.vertx.blueprint.kue.service.JobService;
import io.vertx.blueprint.kue.util.RedisHelper;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.redis.RedisClient;
import io.vertx.serviceproxy.ProxyHelper;


public class KueVerticle extends AbstractVerticle {

  private static Logger logger = LoggerFactory.getLogger(Job.class);

  public static final String EB_JOB_SERVICE_ADDRESS = "vertx.kue.service.job.internal"; // (1)

  private JsonObject config;
  private JobService jobService;

  @Override
  public void start(Future future) throws Exception {
    this.config = config();
    this.jobService = JobService.create(vertx, config); // (2)
    // create redis client
    RedisClient redisClient = RedisHelper.client(vertx, config);
    redisClient.ping(pr -> { // (3) test connection
      if (pr.succeeded()) {
        logger.info("Kue Verticle is running...");

        // (4) register job service
        ProxyHelper.registerService(JobService.class, vertx, jobService, EB_JOB_SERVICE_ADDRESS);

        future.complete();
      } else {
        logger.error("oops!", pr.cause());
        future.fail(pr.cause());
      }
    });
  }

}

首先我們需要定義一個(gè)地址用于服務(wù)注冊(cè) (1)。在start方法中,我們創(chuàng)建了一個(gè)任務(wù)服務(wù)實(shí)例 (2),然后通過(guò)ping命令測(cè)試Redis連接 (3)。如果連接正常,那么我們就可以通過(guò)ProxyHelper類(lèi)中的registerService輔助方法來(lái)將服務(wù)實(shí)例注冊(cè)至Event Bus上 (4)。

這樣,一旦我們?cè)诩耗J较虏渴?b>KueVerticle,服務(wù)就會(huì)被發(fā)布至Event Bus上,然后我們就可以在其他組件中去遠(yuǎn)程調(diào)用此服務(wù)了。很奇妙吧!

Kue - 工作隊(duì)列

Kue類(lèi)代表著工作隊(duì)列。我們來(lái)看一下Kue類(lèi)的實(shí)現(xiàn)。首先先看一下其構(gòu)造函數(shù):

public Kue(Vertx vertx, JsonObject config) {
  this.vertx = vertx;
  this.config = config;
  this.jobService = JobService.createProxy(vertx, EB_JOB_SERVICE_ADDRESS);
  this.client = RedisHelper.client(vertx, config);
  Job.setVertx(vertx, RedisHelper.client(vertx, config)); // init static vertx instance inner job
}

這里我們需要注意兩點(diǎn):第一點(diǎn),我們通過(guò)createProxy方法來(lái)創(chuàng)建一個(gè)JobService的服務(wù)代理;第二點(diǎn),之前提到過(guò),我們需要在這里初始化Job類(lèi)中的靜態(tài)成員變量。

基于Future的封裝

我們的JobService是基于回調(diào)的,這是服務(wù)代理組件所要求的。為了讓Vert.x Kue更加響應(yīng)式,使用起來(lái)更加方便,我們?cè)?b>Kue類(lèi)中以基于Future的異步模式封裝了JobService中的所有異步方法。這很簡(jiǎn)單,比如這個(gè)方法:

@Fluent
JobService getJob(long id, Handler> handler);

可以這么封裝:

public Future> getJob(long id) {
  Future> future = Future.future();
  jobService.getJob(id, r -> {
    if (r.succeeded()) {
      future.complete(Optional.ofNullable(r.result()));
    } else {
      future.fail(r.cause());
    }
  });
  return future;
}

其實(shí)就是加一層Future。其它的封裝過(guò)程也類(lèi)似所以我們就不細(xì)說(shuō)了。

process和processBlocking方法

processprocessBlocking方法用于處理任務(wù):

public Kue process(String type, int n, Handler handler) {
  if (n <= 0) {
    throw new IllegalStateException("The process times must be positive");
  }
  while (n-- > 0) {
    processInternal(type, handler, false);
  }f
  setupTimers();
  return this;
}

public Kue process(String type, Handler handler) {
  processInternal(type, handler, false);
  setupTimers();
  return this;
}

public Kue processBlocking(String type, int n, Handler handler) {
  if (n <= 0) {
    throw new IllegalStateException("The process times must be positive");
  }
  while (n-- > 0) {
    processInternal(type, handler, true);
  }
  setupTimers();
  return this;
}

兩個(gè)process方法都類(lèi)似 —— 它們都是使用Event Loop線程處理任務(wù)的,其中第一個(gè)方法還可以指定同時(shí)處理任務(wù)數(shù)量的閾值。我們來(lái)回顧一下使用Event Loop線程的注意事項(xiàng) —— 我們不能阻塞Event Loop線程。因此如果我們需要在處理任務(wù)時(shí)做一些耗時(shí)的操作,我們可以使用processBlocking方法。這幾個(gè)方法的代碼看起來(lái)都差不多,那么區(qū)別在哪呢?之前我們提到過(guò),我們?cè)O(shè)計(jì)了一種Verticle - KueWorker,用于處理任務(wù)。因此對(duì)于process方法來(lái)說(shuō),KueWorker就是一種普通的Verticle;而對(duì)于processBlocking方法來(lái)說(shuō),KueWorker是一種Worker Verticle。這兩種Verticle有什么不同呢?區(qū)別在于,Worker Verticle會(huì)使用Worker線程,因此即使我們執(zhí)行一些耗時(shí)的操作,Event Loop線程也不會(huì)被阻塞。

創(chuàng)建及部署KueWorker的邏輯在processInternal方法中,這三個(gè)方法都使用了processInternal方法:

private void processInternal(String type, Handler handler, boolean isWorker) {
  KueWorker worker = new KueWorker(type, handler, this); // (1)
  vertx.deployVerticle(worker, new DeploymentOptions().setWorker(isWorker), r0 -> { // (2)
    if (r0.succeeded()) {
      this.on("job_complete", msg -> {
        long dur = new Job(((JsonObject) msg.body()).getJsonObject("job")).getDuration();
        client.incrby(RedisHelper.getKey("stats:work-time"), dur, r1 -> { // (3)
          if (r1.failed())
            r1.cause().printStackTrace();
        });
      });
    }
  });
}

首先我們創(chuàng)建一個(gè)KueWorker實(shí)例 (1)。我們將在稍后詳細(xì)介紹KueWorker的實(shí)現(xiàn)。然后我們根據(jù)提供的配置來(lái)部署此KueWorker (2)。processInternal方法的第三個(gè)參數(shù)代表此KueWorker是否為worker verticle。如果部署成功,我們就監(jiān)聽(tīng)complete事件。每當(dāng)接收到complete事件的時(shí)候,我們獲取收到的信息(處理任務(wù)消耗的時(shí)間),然后用incrby增加對(duì)應(yīng)的工作時(shí)間 (3)。

再回到前面三個(gè)處理方法中。除了部署KueWorker以外,我們還調(diào)用了setupTimers方法,用于設(shè)定定時(shí)器以監(jiān)測(cè)延時(shí)任務(wù)以及監(jiān)測(cè)活動(dòng)任務(wù)TTL。

監(jiān)測(cè)延時(shí)任務(wù)

Vert.x Kue支持延時(shí)任務(wù),因此我們需要在任務(wù)延時(shí)時(shí)間到達(dá)時(shí)將任務(wù)“提升”至工作隊(duì)列中等待處理。這個(gè)工作是在checkJobPromotion方法中實(shí)現(xiàn)的:

private void checkJobPromotion() {
  int timeout = config.getInteger("job.promotion.interval", 1000); // (1)
  int limit = config.getInteger("job.promotion.limit", 1000); // (2)
  vertx.setPeriodic(timeout, l -> { // (3)
    client.zrangebyscore(RedisHelper.getKey("jobs:DELAYED"), String.valueOf(0), String.valueOf(System.currentTimeMillis()),
      new RangeLimitOptions(new JsonObject().put("offset", 0).put("count", limit)), r -> {  // (4)
        if (r.succeeded()) {
          r.result().forEach(r1 -> {
            long id = Long.parseLong(RedisHelper.stripFIFO((String) r1));
            this.getJob(id).compose(jr -> jr.get().inactive())  // (5)
              .setHandler(jr -> {
                if (jr.succeeded()) {
                  jr.result().emit("promotion", jr.result().getId()); // (6)
                } else {
                  jr.cause().printStackTrace();
                }
              });
          });
        } else {
          r.cause().printStackTrace();
        }
      });
  });
}

首先我們從配置中獲取監(jiān)測(cè)延時(shí)任務(wù)的間隔(job.promotion.interval,默認(rèn)1000ms)以及提升數(shù)量閾值(job.promotion.limit,默認(rèn)1000)。然后我們使用vertx.setPeriodic方法設(shè)一個(gè)周期性的定時(shí)器 (3),每隔一段時(shí)間就從Redis中獲取需要被提升的任務(wù) (4)。這里我們通過(guò)zrangebyscore獲取每個(gè)需要被提升任務(wù)的id。我們來(lái)看一下zrangebyscore方法的定義:

RedisClient zrangebyscore(String key, String min, String max, RangeLimitOptions options, Handler> handler);

key: 某個(gè)有序集合的key,即vertx_kue:jobs:DELAYED

min and max: 最小值以及最大值(按照某種模式)。這里min0,而max是當(dāng)前時(shí)間戳

我們來(lái)回顧一下Job類(lèi)中的state方法。當(dāng)我們要把任務(wù)狀態(tài)設(shè)為DELAYED的時(shí)候,我們將score設(shè)為promote_at時(shí)間:

case DELAYED:
  client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()),
    this.promote_at, this.zid, _failure());

因此我們將max設(shè)為當(dāng)前時(shí)間(System.currentTimeMillis()),只要當(dāng)前時(shí)間超過(guò)需要提升的時(shí)間,這就說(shuō)明此任務(wù)可以被提升了。

options: range和limit配置。這里我們需要指定LIMIT值所以我們用new RangeLimitOptions(new JsonObject().put("offset", 0).put("count", limit)創(chuàng)建了一個(gè)配置

zrangebyscore的結(jié)果是一個(gè)JsonArray,里面包含著所有等待提升任務(wù)的zid。獲得結(jié)果后我們就將每個(gè)zid轉(zhuǎn)換為id,然后分別獲取對(duì)應(yīng)的任務(wù)實(shí)體,最后對(duì)每個(gè)任務(wù)調(diào)用inactive方法來(lái)將任務(wù)狀態(tài)設(shè)為INACTIVE (5)。如果任務(wù)成功提升至工作隊(duì)列,我們就發(fā)送promotion事件 (6)。

CallbackKue - 提供多語(yǔ)言支持

我們知道,Vert.x支持多種語(yǔ)言(如JS,Ruby),因此如果能讓我們的Vert.x Kue支持多種語(yǔ)言那當(dāng)然是極好的!這沒(méi)有問(wèn)題~Vert.x Codegen可以處理含@VertxGen注解的異步接口,生成多語(yǔ)言版本。@VertxGen注解同樣限制異步方法 —— 需要基于回調(diào),因此我們?cè)O(shè)計(jì)了一個(gè)CallbackKue接口用于提供多語(yǔ)言支持。CallbackKue的設(shè)計(jì)非常簡(jiǎn)單,其實(shí)現(xiàn)復(fù)用了KuejobService的代碼。大家可以直接看源碼,一目了然,這里就不細(xì)說(shuō)了。

注意要生成多語(yǔ)言版本的代碼,需要添加相應(yīng)的依賴(lài)。比如要生成Ruby版本的代碼就要向build.gradle中添加compile("io.vertx:vertx-lang-ruby:${vertxVersion}")

KueWorker - 任務(wù)在此處理

好啦,我們已經(jīng)對(duì)Vert.x Kue Core的幾個(gè)核心部分有了大致的了解了,現(xiàn)在是時(shí)候探索一下任務(wù)處理的本源 - KueWorker了~

每一個(gè)worker都對(duì)應(yīng)一個(gè)特定的任務(wù)類(lèi)型,并且綁定著特定的處理函數(shù)(Handler),所以我們需要在創(chuàng)建的時(shí)候指定它們。

prepareAndStart方法

KueWorker中,我們使用prepareAndStart方法來(lái)準(zhǔn)備要處理的任務(wù)并且開(kāi)始處理任務(wù)的過(guò)程:

private void prepareAndStart() {
  this.getJobFromBackend().setHandler(jr -> { // (1)
    if (jr.succeeded()) {
      if (jr.result().isPresent()) {
        this.job = jr.result().get(); // (2)
        process(); // (3)
      } else {
        this.emitJobEvent("error", null, new JsonObject().put("message", "job_not_exist"));
        throw new IllegalStateException("job not exist");
      }
    } else {
        this.emitJobEvent("error", null, new JsonObject().put("message", jr.cause().getMessage()));
        jr.cause().printStackTrace();
    }
  });
}

代碼比較直觀。首先我們通過(guò)getJobFromBackend方法從Redis中按照優(yōu)先級(jí)順序獲取任務(wù) (1)。如果成功獲取任務(wù),我們就把獲取到的任務(wù)保存起來(lái) (2) 然后通過(guò)process方法處理任務(wù) (3)。如果中間出現(xiàn)錯(cuò)誤,我們需要發(fā)送error錯(cuò)誤事件,其中攜帶錯(cuò)誤信息。

使用zpop按照優(yōu)先級(jí)順序獲取任務(wù)

我們來(lái)

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/64982.html

相關(guān)文章

  • Vert.x Blueprint 系列教程() | Vert.x Kue 教程(Web部分)

    摘要:上部分藍(lán)圖教程中我們一起探索了如何用開(kāi)發(fā)一個(gè)基于消息的應(yīng)用。對(duì)部分來(lái)說(shuō),如果看過(guò)我們之前的藍(lán)圖待辦事項(xiàng)服務(wù)開(kāi)發(fā)教程的話,你應(yīng)該對(duì)這一部分非常熟悉了,因此這里我們就不詳細(xì)解釋了。有關(guān)使用實(shí)現(xiàn)的教程可參考藍(lán)圖待辦事項(xiàng)服務(wù)開(kāi)發(fā)教程。 上部分藍(lán)圖教程中我們一起探索了如何用Vert.x開(kāi)發(fā)一個(gè)基于消息的應(yīng)用。在這部分教程中,我們將粗略地探索一下kue-http模塊的實(shí)現(xiàn)。 Vert.x Kue ...

    Kerr1Gan 評(píng)論0 收藏0
  • Vert.x Blueprint 系列教程(三) | Micro-Shop 微服務(wù)應(yīng)用實(shí)戰(zhàn)

    摘要:本教程是藍(lán)圖系列的第三篇教程,對(duì)應(yīng)的版本為。提供了一個(gè)服務(wù)發(fā)現(xiàn)模塊用于發(fā)布和獲取服務(wù)記錄。前端此微服務(wù)的前端部分,目前已整合至組件中。監(jiān)視儀表板用于監(jiān)視微服務(wù)系統(tǒng)的狀態(tài)以及日志統(tǒng)計(jì)數(shù)據(jù)的查看。而服務(wù)則負(fù)責(zé)發(fā)布其它服務(wù)如服務(wù)或消息源并且部署。 本文章是 Vert.x 藍(lán)圖系列 的第三篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待辦事項(xiàng)服務(wù)開(kāi)發(fā)教程 Vert....

    QiShare 評(píng)論0 收藏0
  • Vert.x Blueprint 系列教程(一) | 待辦事項(xiàng)服務(wù)開(kāi)發(fā)教程

    摘要:本文章是藍(lán)圖系列的第一篇教程。是事件驅(qū)動(dòng)的,同時(shí)也是非阻塞的。是一組負(fù)責(zé)分發(fā)和處理事件的線程。注意,我們絕對(duì)不能去阻塞線程,否則事件的處理過(guò)程會(huì)被阻塞,我們的應(yīng)用就失去了響應(yīng)能力。每個(gè)負(fù)責(zé)處理請(qǐng)求并且寫(xiě)入回應(yīng)結(jié)果。 本文章是 Vert.x 藍(lán)圖系列 的第一篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待辦事項(xiàng)服務(wù)開(kāi)發(fā)教程 Vert.x Blueprint 系...

    frank_fun 評(píng)論0 收藏0
  • 使用Vert.x構(gòu)建Web服務(wù)器和消息系統(tǒng)

    摘要:而不是開(kāi)始,將服務(wù)使用多線程的請(qǐng)求重量級(jí)的容器。是啟動(dòng)多個(gè)輕便單線程的服務(wù)器和流量路由到他們。亮點(diǎn)應(yīng)用程序是事件驅(qū)動(dòng),異步和單線程的。通過(guò)使用事件總線傳遞消息通信。為了建立一個(gè)消息系統(tǒng),則需要獲得該事件總線。 摘要 如果你對(duì)Node.js感興趣,Vert.x可能是你的下一個(gè)大事件:一個(gè)建立在JVM上一個(gè)類(lèi)似的架構(gòu)企業(yè)制度。 這一部分介紹Vert.x是通過(guò)兩個(gè)動(dòng)手的例子(基于Vert.x...

    DrizzleX 評(píng)論0 收藏0
  • 【小項(xiàng)目】全棧開(kāi)發(fā)培訓(xùn)手冊(cè) | 后端(1) vert.x框架理解

    摘要:二來(lái),給大家新開(kāi)坑的項(xiàng)目一個(gè)參考。因此,本系列以主要以官方文檔為基礎(chǔ),將盡可能多的特性融入本項(xiàng)目,并標(biāo)注官網(wǎng)原文出處,有興趣的小伙伴可點(diǎn)擊深入了解??梢酝ㄟ^(guò)一些特殊協(xié)議例如將消息作為統(tǒng)一消息服務(wù)導(dǎo)出。下載完成后自行修改和。 開(kāi)坑前言 我給這個(gè)專(zhuān)欄的名氣取名叫做小項(xiàng)目,聽(tīng)名字就知道,這個(gè)專(zhuān)題最終的目的是帶領(lǐng)大家完成一個(gè)項(xiàng)目。為什么要開(kāi)這么大一個(gè)坑呢,一來(lái),雖然網(wǎng)上講IT知識(shí)點(diǎn)的書(shū)籍鋪天蓋...

    hightopo 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<