成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Akka系列(七):Actor持久化之Akka persistence

miguel.jiang / 1148人閱讀

摘要:源碼鏈接進階持久化插件有同學(xué)可能會問,我對不是很熟悉亦或者覺得單機存儲并不是安全,有沒有支持分布式數(shù)據(jù)存儲的插件呢,比如某爸的云數(shù)據(jù)庫答案當(dāng)然是有咯,良心的我當(dāng)然是幫你們都找好咯。

這次把這部分內(nèi)容提到現(xiàn)在寫,是因為這段時間開發(fā)的項目剛好在這一塊遇到了一些難點,所以準(zhǔn)備把經(jīng)驗分享給大家,我們在使用Akka時,會經(jīng)常遇到一些存儲Actor內(nèi)部狀態(tài)的場景,在系統(tǒng)正常運行的情況下,我們不需要擔(dān)心什么,但是當(dāng)系統(tǒng)出錯,比如Actor錯誤需要重啟,或者內(nèi)存溢出,亦或者整個系統(tǒng)崩潰,如果我們不采取一定的方案的話,在系統(tǒng)重啟時Actor的狀態(tài)就會丟失,這會導(dǎo)致我們丟失一些關(guān)鍵的數(shù)據(jù),造成系統(tǒng)數(shù)據(jù)不一致的問題。Akka作為一款成熟的生產(chǎn)環(huán)境應(yīng)用,為我們提供了相應(yīng)的解決方案就是Akka persistence。

為什么需要持久化的Actor?

萬變不離其宗,數(shù)據(jù)的一致性是永恒的主題,一個性能再好的系統(tǒng),不能保證數(shù)據(jù)的正確,也稱不上是一個好的系統(tǒng),一個系統(tǒng)在運行的時候難免會出錯,如何保證系統(tǒng)在出錯后能正確的恢復(fù)數(shù)據(jù),不讓數(shù)據(jù)出現(xiàn)混亂是一個難題。使用Actor模型的時候,我們會有這么一個想法,就是能不對數(shù)據(jù)庫操作就盡量不對數(shù)據(jù)庫操作(這里我們假定我們的數(shù)據(jù)庫是安全,可靠的,能保證數(shù)據(jù)的正確性和一致性,比如使用國內(nèi)某云的云數(shù)據(jù)庫),一方面如果大量的數(shù)據(jù)操作會使數(shù)據(jù)庫面臨的巨大的壓力,導(dǎo)致崩潰,另一方面即使數(shù)據(jù)庫能處理的過來,比如一些count,update的大表操作也會消耗很多的時間,遠沒有內(nèi)存中直接操作來的快,大大影響性能。但是又有人說幾人內(nèi)存操作這么快,為什么不把數(shù)據(jù)都放內(nèi)存中呢?答案顯而易見,當(dāng)出現(xiàn)機器死機,或者內(nèi)存溢出等問題時,數(shù)據(jù)很有可能就丟失了導(dǎo)致無法恢復(fù)。在這種背景下,我們是不是有一種比較好的解決方案,既能滿足需求又能用最小的性能消耗,答案就是上面我們的說的Akka persistence。

Akka persistence的核心架構(gòu)

在具體深入Akka persistence之前,我們可以先了解一下它的核心設(shè)計理念,其實簡單來說,我們可以利用一些thing來恢復(fù)Actor的狀態(tài),這里的thing可以是日志、數(shù)據(jù)庫中的數(shù)據(jù),亦或者是文件,所以說它的本質(zhì)非常容易理解,在Actor處理的時候我們會保存一些數(shù)據(jù),Actor在恢復(fù)的時候能根據(jù)這些數(shù)據(jù)恢復(fù)其自身的狀態(tài)。

所以Akka persistence 有以下幾個關(guān)鍵部分組成:

PersistentActor:任何一個需要持久化的Actor都必須繼承它,并必須定義或者實現(xiàn)其中的三個關(guān)鍵屬性:

 def persistenceId = "example" //作為持久化Actor的唯一表示,用于持久化或者查詢時使用

 def receiveCommand: Receive = ??? //Actor正常運行時處理處理消息邏輯,可在這部分內(nèi)容里持久化自己想要的消息

 def receiveRecover: Receive = ??? //Actor重啟恢復(fù)是執(zhí)行的邏輯

相比普通的Actor,除receiveCommand相似以外,還必須實現(xiàn)另外兩個屬性。
另外在持久化Actor中還有另外兩個關(guān)鍵的的概念就是JournalSnapshot,前者用于持久化事件,后者用于保存Actor的快照,兩者在Actor恢復(fù)狀態(tài)的時候都起到了至關(guān)重要的作用。

Akka persistence的demo實戰(zhàn)

這里我首先會用一個demo讓大家能對Akka persistence的使用有一定了解的,并能大致明白它的工作原理,后面再繼續(xù)講解一些實戰(zhàn)可能會遇到的問題。

假定現(xiàn)在有這么一個場景,現(xiàn)在假設(shè)有一個1w元的大紅包,瞬間可能會很多人同時來搶,每個人搶的金額也可能不一樣,場景很簡單,實現(xiàn)方式也有很多種,但前提是保證數(shù)據(jù)的正確性,比如最普通的使用數(shù)據(jù)庫保證,但對這方面有所了解的同學(xué)都知道這并不是一個很好的方案,因為需要鎖,并需要大量的數(shù)據(jù)庫操作,導(dǎo)致性能不高,那么我們是否可以用Actor來實現(xiàn)這個需求么?答案是當(dāng)然可以。

我們首先來定義一個抽獎命令,

case class LotteryCmd(
  userId: Long, // 參與用戶Id
  username: String, //參與用戶名
  email: String // 參與用戶郵箱
)

然后我們實現(xiàn)一個抽獎Actor,并繼承PersistentActor作出相應(yīng)的實現(xiàn):

case class LuckyEvent(  //抽獎成功事件
    userId: Long,
    luckyMoney: Int
)
case class FailureEvent(  //抽獎失敗事件
    userId: Long,
    reason: String
)
case class Lottery(
    totalAmount: Int,  //紅包總金額
    remainAmount: Int  //剩余紅包金額
) {
  def update(luckyMoney: Int) = {
    copy(
      remainAmount = remainAmount - luckyMoney
    )
  }
}
class LotteryActor(initState: Lottery) extends PersistentActor with ActorLogging{
  override def persistenceId: String = "lottery-actor-1"

  var state = initState  //初始化Actor的狀態(tài)

  override def receiveRecover: Receive = {
    case event: LuckyEvent =>
      updateState(event)  //恢復(fù)Actor時根據(jù)持久化的事件恢復(fù)Actor狀態(tài)
    case SnapshotOffer(_, snapshot: Lottery) =>
      log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}")
      state = snapshot //利用快照恢復(fù)Actor的狀態(tài)
    case RecoveryCompleted => log.info("the actor recover completed")
  }

  def updateState(le: LuckyEvent) =
    state = state.update(le.luckyMoney)  //更新自身狀態(tài)

  override def receiveCommand: Receive = {
    case lc: LotteryCmd =>
      doLottery(lc) match {     //進行抽獎,并得到抽獎結(jié)果,根據(jù)結(jié)果做出不同的處理
        case le: LuckyEvent =>  //抽到隨機紅包
          persist(le) { event =>
            updateState(event)
            increaseEvtCountAndSnapshot()
            sender() ! event
          }
        case fe: FailureEvent =>  //紅包已經(jīng)抽完
          sender() ! fe
      }
    case "saveSnapshot" =>  // 接收存儲快照命令執(zhí)行存儲快照操作
      saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>  ???  //你可以在快照存儲成功后做一些操作,比如刪除之前的快照等
  }

  private def increaseEvtCountAndSnapshot() = {
    val snapShotInterval = 5
    if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) {  //當(dāng)有持久化5個事件后我們便存儲一次當(dāng)前Actor狀態(tài)的快照
      self ! "saveSnapshot"
    }
  }

  def doLottery(lc: LotteryCmd) = {  //抽獎邏輯具體實現(xiàn)
    if (state.remainAmount > 0) {
      val luckyMoney = scala.util.Random.nextInt(state.remainAmount) + 1
      LuckyEvent(lc.userId, luckyMoney)
    }
    else {
      FailureEvent(lc.userId, "下次早點來,紅包已被抽完咯!")
    }
  }
}

程序很簡單,關(guān)鍵位置我也給了注釋,相信大家對Actor有所了解的話很容易理解,當(dāng)然要是有些疑惑,可以看看我之前寫的文章,下面我們就對剛才寫的抽紅包Actor進行測試:

object PersistenceTest extends App {
  val lottery = Lottery(10000,10000)
  val system = ActorSystem("example-05")
  val lotteryActor = system.actorOf(Props(new LotteryActor(lottery)), "LotteryActor-1")  //創(chuàng)建抽獎Actor
  val pool: ExecutorService = Executors.newFixedThreadPool(10)
  val r = (1 to 100).map(i =>
    new LotteryRun(lotteryActor, LotteryCmd(i.toLong,"godpan","[email protected]"))  //創(chuàng)建100個抽獎?wù)埱?  )
  r.map(pool.execute(_))  //使用線程池來發(fā)起抽獎?wù)埱螅M同時多人參加
  Thread.sleep(5000)
  pool.shutdown()
  system.terminate()
}

class LotteryRun(lotteryActor: ActorRef, lotteryCmd: LotteryCmd) extends Runnable { //抽獎?wù)埱?  implicit val timeout = Timeout(3.seconds)
  def run: Unit = {
    for {
      fut <- lotteryActor ? lotteryCmd
    } yield fut match {  //根據(jù)不同事件顯示不同的抽獎結(jié)果
      case le: LuckyEvent => println(s"恭喜用戶${le.userId}抽到了${le.luckyMoney}元紅包")
      case fe: FailureEvent =>  println(fe.reason)
      case _ => println("系統(tǒng)錯誤,請重新抽取")
    }
  }
}

運行程序,我們可能看到以下的結(jié)果:

下面我會把persistence actor在整個運行過程的步驟給出,幫助大家理解它的原理:

1.初始化Persistence Actor

1.1若是第一次初始化,則與正常的Actor的初始化一致。

1.2若是重啟恢復(fù)Actor,這根據(jù)Actor之前持久的數(shù)據(jù)恢復(fù)。

1.2.1從快照恢復(fù),可快速恢復(fù)Actor,但并非每次持久化事件都會保存快照,在快照完整的情況下,Actor優(yōu)先從快照恢復(fù)自身狀態(tài)。

1.2.2從事件(日志,數(shù)據(jù)庫記錄等)恢復(fù),通過重放持久化事件恢復(fù)Actor狀態(tài),比較關(guān)鍵。

2.接收命令進行處理,轉(zhuǎn)化為需要持久化的事件(持久化的事件盡量只包含關(guān)鍵性的數(shù)據(jù))使用Persistence Actor的持久化方法進行持久化(上述例子中的persist,后面我會講一下批量持久化),并處理持久化成功后的邏輯處理,比如修改Actor狀態(tài),向外部Actor發(fā)送消息等。

3.若是我們需要存儲快照,那么可以主動指定存儲快照的頻率,比如持久化事件100次我們就存儲一次快照,這個頻率應(yīng)該要考慮實際的業(yè)務(wù)場景,在存儲快照成功后我們也可以執(zhí)行一些操作。

總的來說Persistence Actor運行時的大致操作就是以上這些,當(dāng)然它是r如何持久化事件,恢復(fù)時的機制是怎么樣的等有興趣的可以看一下Akka源碼。

使用Akka persistence的相關(guān)配置

首先我們必須加載相應(yīng)的依賴包,在bulid.sbt中加入以下依賴:

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4.16",  //Akka actor 核心依賴
  "com.typesafe.akka" %% "akka-persistence" % "2.4.16", //Akka persistence 依賴
  "org.iq80.leveldb"            % "leveldb"          % "0.7", //leveldb java版本依賴
  "org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8", //leveldb java版本依賴
  "com.twitter"              %% "chill-akka"                  % "0.8.0" //事件序列化依賴
)

另外我們還需在application.conf加入以下配置:

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

akka.persistence.journal.leveldb.dir = "log/journal"
akka.persistence.snapshot-store.local.dir = "log/snapshots"

# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false  //因為我們本地并沒有安裝leveldb,所以這個屬性置為false,但是生產(chǎn)環(huán)境并不推薦使用

akka.actor.serializers {
  kryo = "com.twitter.chill.akka.AkkaSerializer"
}

akka.actor.serialization-bindings {
  "scala.Product" = kryo
  "akka.persistence.PersistentRepr" = kryo
}

至此為止我們整個Akka persistence demo已經(jīng)搭建好了,可以正常運行了,有興趣的同學(xué)可以下載源碼。源碼鏈接

Akka persistence進階 1.持久化插件

有同學(xué)可能會問,我對leveldb不是很熟悉亦或者覺得單機存儲并不是安全,有沒有支持分布式數(shù)據(jù)存儲的插件呢,比如某爸的云數(shù)據(jù)庫?答案當(dāng)然是有咯,良心的我當(dāng)然是幫你們都找好咯。

1.akka-persistence-sql-async: 支持MySQL和PostgreSQL,另外使用了全異步的數(shù)據(jù)庫驅(qū)動,提供異步非阻塞的API,我司用的就是它的變種版,6的飛起。項目地址

2.akka-persistence-cassandra: 官方推薦的插件,使用寫性能very very very fast的cassandra數(shù)據(jù)庫,是幾個插件中比較流行的一個,另外它還支持persistence query。項目地址

3.akka-persistence-redis: redis應(yīng)該也很符合Akka persistence的場景,熟悉redis的同學(xué)可以使用看看。項目地址

4.akka-persistence-jdbc: 怎么能少了jdbc呢?不然怎么對的起java爸爸呢,支持scala和java哦。項目地址

相應(yīng)的插件的具體使用可以看該項目的具體介紹使用,我看了下相對來說都是比較容易的。

2.批量持久化

上面說到我司用的是akka-persistence-sql-async插件,所以我們是將事件和快照持久化到數(shù)據(jù)庫的,一開始我也是像上面demo一樣,每次事件都會持久化到數(shù)據(jù)庫,但是后來在性能測試的時候,因為本身業(yè)務(wù)場景對數(shù)據(jù)庫的壓力也比較大,在當(dāng)數(shù)據(jù)庫到達每秒1000+的讀寫量后,另外說明一下使用的是某云數(shù)據(jù)庫,性能中配以上,發(fā)現(xiàn)每次持久化的時間將近要15ms,這樣換算一下的話Actor每秒只能處理60~70個需要持久化的事件,而實際業(yè)務(wù)場景要求Actor必須在3秒內(nèi)返回處理結(jié)果,這種情況下導(dǎo)致大量消息處理超時得不到反饋,另外還有大量的消息得不到處理,導(dǎo)致系統(tǒng)錯誤暴增,用戶體驗下降,既然我們發(fā)現(xiàn)了問題,那么我們能不能進行優(yōu)化呢?事實上當(dāng)然是可以,既然單個插入慢,那么我們能不能批量插入呢,Akka persistence為我們提供了persistAll方法,下面我就對上面的demo進行一下改造,讓其變成批量持久化:

class LotteryActorN(initState: Lottery) extends PersistentActor with ActorLogging{
  override def persistenceId: String = "lottery-actor-2"

  var state = initState  //初始化Actor的狀態(tài)

  override def receiveRecover: Receive = {
    case event: LuckyEvent =>
      updateState(event)  //恢復(fù)Actor時根據(jù)持久化的事件恢復(fù)Actor狀態(tài)
    case SnapshotOffer(_, snapshot: Lottery) =>
      log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}")
      state = snapshot //利用快照恢復(fù)Actor的狀態(tài)
    case RecoveryCompleted => log.info("the actor recover completed")
  }

  def updateState(le: LuckyEvent) =
    state = state.update(le.luckyMoney)  //更新自身狀態(tài)

  var lotteryQueue : ArrayBuffer[(LotteryCmd, ActorRef)] = ArrayBuffer()

  context.system.scheduler  //定時器,定時觸發(fā)抽獎邏輯
    .schedule(
      0.milliseconds,
      100.milliseconds,
      new Runnable {
        def run = {
          self ! "doLottery"
        }
      }
    )

  override def receiveCommand: Receive = {
    case lc: LotteryCmd =>
      lotteryQueue = lotteryQueue :+ (lc, sender())  //參與信息加入抽獎隊列
      println(s"the lotteryQueue size is ${lotteryQueue.size}")
      if (lotteryQueue.size > 5)  //當(dāng)參與人數(shù)有5個時觸發(fā)抽獎
        joinN(lotteryQueue)
    case "doLottery" =>
      if (lotteryQueue.size > 0)
        joinN(lotteryQueue)
    case "saveSnapshot" =>  // 接收存儲快照命令執(zhí)行存儲快照操作
      saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>  ???  //你可以在快照存儲成功后做一些操作,比如刪除之前的快照等
  }

  private def joinN(lotteryQueue: ArrayBuffer[(LotteryCmd, ActorRef)]) = {  //批量處理抽獎結(jié)果
    val rs = doLotteryN(lotteryQueue)
    val success = rs.collect {  //得到其中中獎的相應(yīng)信息
      case (event: LuckyEvent, ref: ActorRef) =>
        event -> ref
    }.toMap
    val failure = rs.collect {  //得到其中未中獎的相應(yīng)信息
      case (event: FailureEvent, ref: ActorRef) => event -> ref
    }
    persistAll(success.keys.toIndexedSeq) {  //批量持久化中獎用戶事件
      case event =>  println(event)
        updateState(event)
        increaseEvtCountAndSnapshot()
        success(event) ! event
    }
    failure.foreach {
      case (event, ref) => ref ! event
    }
    this.lotteryQueue.clear()  //清空參與隊列
  }


  private def increaseEvtCountAndSnapshot() = {
    val snapShotInterval = 5
    if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) {  //當(dāng)有持久化5個事件后我們便存儲一次當(dāng)前Actor狀態(tài)的快照
      self ! "saveSnapshot"
    }
  }

  private def doLotteryN(lotteryQueue: ArrayBuffer[(LotteryCmd, ActorRef)]) = {  //抽獎邏輯具體實現(xiàn)
    var remainAmount = state.remainAmount
    lotteryQueue.map(lq =>
      if (remainAmount > 0) {
        val luckyMoney = scala.util.Random.nextInt(remainAmount) + 1
        remainAmount = remainAmount - luckyMoney
        (LuckyEvent(lq._1.userId, luckyMoney),lq._2)
      }
      else {
        (FailureEvent(lq._1.userId, "下次早點來,紅包已被抽完咯!"),lq._2)
      }
    )
  }
}

這是改造后的參與Actor,實現(xiàn)了批量持久的功能,當(dāng)然這里為了給發(fā)送者返回消息,處理邏輯稍微復(fù)雜了一點,不過真實場景可能會更復(fù)雜,相關(guān)源碼也在剛才的項目上。

3.Persistence Query

另外Akka Persistence還提供了Query接口,用于需要查詢持久化事件的需求,這部分內(nèi)容可能要根據(jù)實際業(yè)務(wù)場景考慮是否需要應(yīng)用,我就不展開講了,另外我也寫了一個小demo在項目中,想要嘗試的同學(xué)也可以試試。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/67470.html

相關(guān)文章

  • 使用akka作異步任務(wù)處理

    摘要:創(chuàng)建訂單時同步操作有查詢庫存,扣款,刷新庫存可異步的操作有通知風(fēng)控系統(tǒng),給買家發(fā)送扣款郵件和短信,通知賣家,創(chuàng)建一些定時任務(wù)。 同步轉(zhuǎn)異步是一種常見的優(yōu)化手段,最近一次在做調(diào)優(yōu)時便大量使用了這種方式。通常在一個業(yè)務(wù)場景中會包含多個操作,有些操作的結(jié)果需要讓用戶立馬知道,但有些操作則不需要。這些用戶不需要等待結(jié)果的操作,我們在編程的時候便可以異步處理。這么做最直接的效果就是縮短接口響應(yīng)速...

    shiweifu 評論0 收藏0
  • Akka系列(一):Akka簡介與Actor模型

    摘要:是一個構(gòu)建在上,基于模型的的并發(fā)框架,為構(gòu)建伸縮性強,有彈性的響應(yīng)式并發(fā)應(yīng)用提高更好的平臺。上述例子中的信件就相當(dāng)于中的消息,與之間只能通過消息通信。當(dāng)然模型比這要復(fù)雜的多,這里主要是簡潔的闡述一下模型的概念。模型的出現(xiàn)解決了這個問題。 Akka是一個構(gòu)建在JVM上,基于Actor模型的的并發(fā)框架,為構(gòu)建伸縮性強,有彈性的響應(yīng)式并發(fā)應(yīng)用提高更好的平臺。本文主要是個人對Akka的學(xué)習(xí)和應(yīng)...

    PingCAP 評論0 收藏0
  • Akka系列(二):Akka中的Actor系統(tǒng)

    摘要:模型作為中最核心的概念,所以在中的組織結(jié)構(gòu)也至關(guān)重要,本文主要介紹中系統(tǒng)。這里主要是演示可以根據(jù)配置文件的內(nèi)容去加載相應(yīng)的環(huán)境,并應(yīng)用到整個中,這對于我們配置環(huán)境來說是非常方便的。路徑與地址熟悉類系統(tǒng)的同學(xué)應(yīng)該對路徑這個概念很熟悉了。 Actor模型作為Akka中最核心的概念,所以Actor在Akka中的組織結(jié)構(gòu)也至關(guān)重要,本文主要介紹Akka中Actor系統(tǒng)。 Actor系統(tǒng) Act...

    BlackFlagBin 評論0 收藏0
  • 關(guān)于分布式計算的一些概念

    摘要:關(guān)于三者的一些概括總結(jié)離線分析框架,適合離線的復(fù)雜的大數(shù)據(jù)處理內(nèi)存計算框架,適合在線離線快速的大數(shù)據(jù)處理流式計算框架,適合在線的實時的大數(shù)據(jù)處理我是一個以架構(gòu)師為年之內(nèi)目標(biāo)的小小白。 整理自《架構(gòu)解密從分布式到微服務(wù)》第七章——聊聊分布式計算.做了相應(yīng)補充和修改。 [TOC] 前言 不管是網(wǎng)絡(luò)、內(nèi)存、還是存儲的分布式,它們最終目的都是為了實現(xiàn)計算的分布式:數(shù)據(jù)在各個計算機節(jié)點上流動,同...

    Ververica 評論0 收藏0
  • Akka系列(三):監(jiān)管與容錯

    摘要:是所有由系統(tǒng)創(chuàng)建的頂級的監(jiān)管者,如日志監(jiān)聽器,或由配置指定在系統(tǒng)啟動時自動部署的。所有其他被上升到根監(jiān)管者,然后整個系統(tǒng)將會關(guān)閉。監(jiān)管容錯示例本示例主要演示在發(fā)生錯誤時,它的監(jiān)管者會根據(jù)相應(yīng)的監(jiān)管策略進行不同的處理。 Akka作為一種成熟的生產(chǎn)環(huán)境并發(fā)解決方案,必須擁有一套完善的錯誤異常處理機制,本文主要講講Akka中的監(jiān)管和容錯。 監(jiān)管 看過我上篇文章的同學(xué)應(yīng)該對Actor系統(tǒng)的工作...

    shevy 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<