成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

NATS--NATS Streaming持久化

qiangdada / 1194人閱讀

摘要:持久訂閱會(huì)使得對(duì)應(yīng)服務(wù)跟蹤客戶(hù)端最后確認(rèn)消息的序列號(hào)和持久名稱(chēng)。運(yùn)行基于的持久化示例你將會(huì)看到如下的輸出可以看出默認(rèn)的是基于內(nèi)存的持久化。

前言

最近項(xiàng)目中需要使用到一個(gè)消息隊(duì)列,主要用來(lái)將原來(lái)一些操作異步化。根據(jù)自己的使用場(chǎng)景和熟悉程度,選擇了NATS Streaming。之所以,選擇NATS Streaming。一,因?yàn)槲疫x型一些中間件,我會(huì)優(yōu)先選取一些自己熟悉的語(yǔ)言編寫(xiě)的,這樣方便排查問(wèn)題和進(jìn)一步的深究。二,因?yàn)樽约阂恢弊鰇8s等云原生這塊,偏向于cncf基金會(huì)管理的項(xiàng)目,畢竟這些項(xiàng)目從一開(kāi)始就考慮了如何部署在k8s當(dāng)中。三,是評(píng)估項(xiàng)目在不斷發(fā)展過(guò)程中,引入的組件是否能夠依舊滿(mǎn)足需求。

消息隊(duì)列的使用場(chǎng)景

如果問(wèn)為什么這么做,需要說(shuō)一下消息隊(duì)列的使用場(chǎng)景。之前看知乎的時(shí)候,看到一些回答比較認(rèn)同,暫時(shí)拿過(guò)來(lái),更能形象表達(dá)。感謝ScienJus同學(xué)的精彩解答。

消息隊(duì)列的主要特點(diǎn)是異步處理,主要目的是減少請(qǐng)求響應(yīng)時(shí)間和解耦。所以主要的使用場(chǎng)景就是將比較耗時(shí)而且不需要即時(shí)(同步)返回結(jié)果的操作作為消息放入消息隊(duì)列。同時(shí)由于使用了消息隊(duì)列,只要保證消息格式不變,消息的發(fā)送方和接收方并不需要彼此聯(lián)系,也不需要受對(duì)方的影響,即解耦和。

使用場(chǎng)景的話,舉個(gè)例子:

假設(shè)用戶(hù)在你的軟件中注冊(cè),服務(wù)端收到用戶(hù)的注冊(cè)請(qǐng)求后,它會(huì)做這些操作:

校驗(yàn)用戶(hù)名等信息,如果沒(méi)問(wèn)題會(huì)在數(shù)據(jù)庫(kù)中添加一個(gè)用戶(hù)記錄

如果是用郵箱注冊(cè)會(huì)給你發(fā)送一封注冊(cè)成功的郵件,手機(jī)注冊(cè)則會(huì)發(fā)送一條短信

分析用戶(hù)的個(gè)人信息,以便將來(lái)向他推薦一些志同道合的人,或向那些人推薦他

發(fā)送給用戶(hù)一個(gè)包含操作指南的系統(tǒng)通知等等……

但是對(duì)于用戶(hù)來(lái)說(shuō),注冊(cè)功能實(shí)際只需要第一步,只要服務(wù)端將他的賬戶(hù)信息存到數(shù)據(jù)庫(kù)中他便可以登錄上去做他想做的事情了。至于其他的事情,非要在這一次請(qǐng)求中全部完成么?值得用戶(hù)浪費(fèi)時(shí)間等你處理這些對(duì)他來(lái)說(shuō)無(wú)關(guān)緊要的事情么?所以實(shí)際當(dāng)?shù)谝徊阶鐾旰螅?wù)端就可以把其他的操作放入對(duì)應(yīng)的消息隊(duì)列中然后馬上返回用戶(hù)結(jié)果,由消息隊(duì)列異步的進(jìn)行這些操作。

或者還有一種情況,同時(shí)有大量用戶(hù)注冊(cè)你的軟件,再高并發(fā)情況下注冊(cè)請(qǐng)求開(kāi)始出現(xiàn)一些問(wèn)題,例如郵件接口承受不住,或是分析信息時(shí)的大量計(jì)算使cpu滿(mǎn)載,這將會(huì)出現(xiàn)雖然用戶(hù)數(shù)據(jù)記錄很快的添加到數(shù)據(jù)庫(kù)中了,但是卻卡在發(fā)郵件或分析信息時(shí)的情況,導(dǎo)致請(qǐng)求的響應(yīng)時(shí)間大幅增長(zhǎng),甚至出現(xiàn)超時(shí),這就有點(diǎn)不劃算了。面對(duì)這種情況一般也是將這些操作放入消息隊(duì)列(生產(chǎn)者消費(fèi)者模型),消息隊(duì)列慢慢的進(jìn)行處理,同時(shí)可以很快的完成注冊(cè)請(qǐng)求,不會(huì)影響用戶(hù)使用其他功能。

所以在軟件的正常功能開(kāi)發(fā)中,并不需要去刻意的尋找消息隊(duì)列的使用場(chǎng)景,而是當(dāng)出現(xiàn)性能瓶頸時(shí),去查看業(yè)務(wù)邏輯是否存在可以異步處理的耗時(shí)操作,如果存在的話便可以引入消息隊(duì)列來(lái)解決。否則盲目的使用消息隊(duì)列可能會(huì)增加維護(hù)和開(kāi)發(fā)的成本卻無(wú)法得到可觀的性能提升,那就得不償失了。

其實(shí),總結(jié)一下消息隊(duì)列的作用

削峰,形象點(diǎn)的話,可以比喻為蓄水池。比如elk日志收集系統(tǒng)中的kafka,主要在日志高峰期的時(shí)候,在犧牲實(shí)時(shí)性的同時(shí),保證了整個(gè)系統(tǒng)的安全。

同步系統(tǒng)異構(gòu)化。原先一個(gè)同步操作里的諸多步驟,可以考慮將一些不影響主線發(fā)展的步驟,通過(guò)消息隊(duì)列異步處理。比如,電商行業(yè),一個(gè)訂單完成之后,一般除了直接返回給客戶(hù)購(gòu)買(mǎi)成功的消息,還要通知賬戶(hù)組進(jìn)行扣費(fèi),通知處理庫(kù)存變化,通知物流進(jìn)行派送等,通知一些用戶(hù)組做一些增加會(huì)員積分等操作等。

NATS Streaming 簡(jiǎn)介

NATS Streaming是一個(gè)由NATS驅(qū)動(dòng)的數(shù)據(jù)流系統(tǒng),用Go編程語(yǔ)言編寫(xiě)。 NATS Streaming服務(wù)器的可執(zhí)行文件名是nats-streaming-server。 NATS Streaming與核心NATS平臺(tái)無(wú)縫嵌入,擴(kuò)展和互操作。 NATS Streaming服務(wù)器作為Apache-2.0許可下的開(kāi)源軟件提供。 Synadia積極維護(hù)和支持NATS Streaming服務(wù)器。

特點(diǎn)

除了核心NATS平臺(tái)的功能外,NATS Streaming還提供以下功能:

增強(qiáng)消息協(xié)議

NATS Streaming使用谷歌協(xié)議緩沖區(qū)實(shí)現(xiàn)自己的增強(qiáng)型消息格式。這些消息通過(guò)二進(jìn)制數(shù)據(jù)流在NATS核心平臺(tái)進(jìn)行傳播,因此不需要改變NATS的基本協(xié)議。NATS Streaming信息包含以下字段:

  - 序列 - 一個(gè)全局順序序列號(hào)為主題的通道
  - 主題 - 是NATS Streaming 交付對(duì)象
  - 答復(fù)內(nèi)容 - 對(duì)應(yīng)"reply-to"對(duì)應(yīng)的對(duì)象內(nèi)容
  - 數(shù)據(jù) - 真是數(shù)據(jù)內(nèi)容
  - 時(shí)間戳 - 接收的時(shí)間戳,單位是納秒
  - 重復(fù)發(fā)送 - 標(biāo)志這條數(shù)據(jù)是否需要服務(wù)再次發(fā)送
  - CRC32 - 一個(gè)循環(huán)冗余數(shù)據(jù)校驗(yàn)選項(xiàng),在數(shù)據(jù)存儲(chǔ)和數(shù)據(jù)通訊領(lǐng)域里,為了保證數(shù)據(jù)的正確性所采用的檢錯(cuò)手段,這里使用的是 IEEE CRC32 算法

 - 消息/事件的持久性
  NATS Streaming提供了可配置的消息持久化,持久目的地可以為內(nèi)存或者文件。另外,對(duì)應(yīng)的存儲(chǔ)子系統(tǒng)使用了一個(gè)公共接口允許我們開(kāi)發(fā)自己自定義實(shí)現(xiàn)來(lái)持久化對(duì)應(yīng)的消息

 - 至少一次的發(fā)送
  NATS Streaming提供了發(fā)布者和服務(wù)器之間的消息確認(rèn)(發(fā)布操作) 和訂閱者和服務(wù)器之間的消息確認(rèn)(確認(rèn)消息發(fā)送)。其中消息被保存在服務(wù)器端內(nèi)存或者輔助存儲(chǔ)(或其他外部存儲(chǔ)器)用來(lái)為需要重新接受消息的訂閱者進(jìn)行重發(fā)消息。

 - 發(fā)布者發(fā)送速率限定
  NATS Streaming提供了一個(gè)連接選項(xiàng)叫 MaxPubAcksInFlight,它能有效的限制一個(gè)發(fā)布者可能隨意的在任何時(shí)候發(fā)送的未被確認(rèn)的消息。當(dāng)達(dá)到這個(gè)配置的最大數(shù)量時(shí),異步發(fā)送調(diào)用接口將會(huì)被阻塞,直到未確認(rèn)消息降到指定數(shù)量之下。

- 每個(gè)訂閱者的速率匹配/限制
  NATS Streaming運(yùn)行指定的訂閱中設(shè)置一個(gè)參數(shù)為 MaxInFlight,它用來(lái)指定已確認(rèn)但未消費(fèi)的最大數(shù)據(jù)量,當(dāng)達(dá)到這個(gè)限制時(shí),NATS Streaming 將暫停發(fā)送消息給訂閱者,直到未確認(rèn)的數(shù)據(jù)量小于設(shè)定的量為止

以主題重發(fā)的歷史數(shù)據(jù)

  新訂閱的可以在已經(jīng)存儲(chǔ)起來(lái)的訂閱的主題頻道指定起始位置消息流。通過(guò)使用這個(gè)選項(xiàng),消息就可以開(kāi)始發(fā)送傳遞了:

  1. 訂閱的主題存儲(chǔ)的最早的信息
  2. 與當(dāng)前訂閱主題之前的最近存儲(chǔ)的數(shù)據(jù),這通常被認(rèn)為是 "最后的值" 或 "初值" 對(duì)應(yīng)的緩存
  3. 一個(gè)以納秒為基準(zhǔn)的 日期/時(shí)間
  4. 一個(gè)歷史的起始位置相對(duì)當(dāng)前服務(wù)的 日期/時(shí)間,例如:最后30秒
  5. 一個(gè)特定的消息序列號(hào)

持久訂閱

  訂閱也可以指定一個(gè)“持久化的名稱(chēng)”可以在客戶(hù)端重啟時(shí)不受影響。持久訂閱會(huì)使得對(duì)應(yīng)服務(wù)跟蹤客戶(hù)端最后確認(rèn)消息的序列號(hào)和持久名稱(chēng)。當(dāng)這個(gè)客戶(hù)端重啟或者重新訂閱的時(shí)候,使用相同的客戶(hù)端ID 和 持久化的名稱(chēng),對(duì)應(yīng)的服務(wù)將會(huì)從最早的未被確認(rèn)的消息處恢復(fù)。

docker 運(yùn)行NATS Streaming

在運(yùn)行之前,前面已經(jīng)講過(guò)NATS Streaming 相比nats,多了持久化的一個(gè)future。所以我們?cè)诮酉聛?lái)的demo演示中,會(huì)重點(diǎn)說(shuō)這點(diǎn)。

運(yùn)行基于memory的持久化示例:
docker run -ti -p 4222:4222 -p 8222:8222  nats-streaming:0.12.0

你將會(huì)看到如下的輸出:

[1] 2019/02/26 08:13:01.769734 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0
[1] 2019/02/26 08:13:01.769811 [INF] STREAM: ServerID: arfYGWPtu7Cn8Ojcb1yko3
[1] 2019/02/26 08:13:01.769826 [INF] STREAM: Go version: go1.11.5
[1] 2019/02/26 08:13:01.770363 [INF] Starting nats-server version 1.4.1
[1] 2019/02/26 08:13:01.770398 [INF] Git commit [not set]
[4] 2019/02/26 08:13:01.770492 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2019/02/26 08:13:01.770555 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/02/26 08:13:01.770581 [INF] Server is ready
[1] 2019/02/26 08:13:01.799435 [INF] STREAM: Recovering the state...
[1] 2019/02/26 08:13:01.799461 [INF] STREAM: No recovered state
[1] 2019/02/26 08:13:02.052460 [INF] STREAM: Message store is MEMORY
[1] 2019/02/26 08:13:02.052552 [INF] STREAM: ---------- Store Limits ----------
[1] 2019/02/26 08:13:02.052574 [INF] STREAM: Channels:                  100 *
[1] 2019/02/26 08:13:02.052586 [INF] STREAM: --------- Channels Limits --------
[1] 2019/02/26 08:13:02.052601 [INF] STREAM:   Subscriptions:          1000 *
[1] 2019/02/26 08:13:02.052613 [INF] STREAM:   Messages     :       1000000 *
[1] 2019/02/26 08:13:02.052624 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2019/02/26 08:13:02.052635 [INF] STREAM:   Age          :     unlimited *
[1] 2019/02/26 08:13:02.052649 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2019/02/26 08:13:02.052697 [INF] STREAM: ----------------------------------

可以看出默認(rèn)的是基于內(nèi)存的持久化。

運(yùn)行基于file的持久化示例:
docker run -ti -v /Users/gao/test/mq:/datastore  -p 4222:4222 -p 8222:8222  nats-streaming:0.12.0  -store file --dir /datastore -m 8222

你將會(huì)看到如下的輸出:

[1] 2019/02/26 08:16:07.641972 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0
[1] 2019/02/26 08:16:07.642038 [INF] STREAM: ServerID: 9d4H6GAFPibpZv282KY9QM
[1] 2019/02/26 08:16:07.642099 [INF] STREAM: Go version: go1.11.5
[1] 2019/02/26 08:16:07.643733 [INF] Starting nats-server version 1.4.1
[1] 2019/02/26 08:16:07.643762 [INF] Git commit [not set]
[5] 2019/02/26 08:16:07.643894 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/02/26 08:16:07.643932 [INF] Server is ready
[1] 2019/02/26 08:16:07.672145 [INF] STREAM: Recovering the state...
[1] 2019/02/26 08:16:07.679327 [INF] STREAM: No recovered state
[1] 2019/02/26 08:16:07.933519 [INF] STREAM: Message store is FILE
[1] 2019/02/26 08:16:07.933570 [INF] STREAM: Store location: /datastore
[1] 2019/02/26 08:16:07.933633 [INF] STREAM: ---------- Store Limits ----------
[1] 2019/02/26 08:16:07.933679 [INF] STREAM: Channels:                  100 *
[1] 2019/02/26 08:16:07.933697 [INF] STREAM: --------- Channels Limits --------
[1] 2019/02/26 08:16:07.933711 [INF] STREAM:   Subscriptions:          1000 *
[1] 2019/02/26 08:16:07.933749 [INF] STREAM:   Messages     :       1000000 *
[1] 2019/02/26 08:16:07.933793 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2019/02/26 08:16:07.933837 [INF] STREAM:   Age          :     unlimited *
[1] 2019/02/26 08:16:07.933857 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2019/02/26 08:16:07.933885 [INF] STREAM: ----------------------------------
PS

如果部署在k8s當(dāng)中,那么就可以采取基于file的持久化,通過(guò)掛載一個(gè)塊存儲(chǔ)來(lái)保證,數(shù)據(jù)可靠。比如,aws的ebs或是ceph的rbd。

4222為客戶(hù)端連接的端口。8222為監(jiān)控端口。

啟動(dòng)以后訪問(wèn):localhost:8222,可以看到如下的網(wǎng)頁(yè):

啟動(dòng)參數(shù)解析
Streaming Server Options:
    -cid, --cluster_id           Cluster ID (default: test-cluster)
    -st,  --store                Store type: MEMORY|FILE|SQL (default: MEMORY)
          --dir                  For FILE store type, this is the root directory
    -mc,  --max_channels            Max number of channels (0 for unlimited)
    -msu, --max_subs                Max number of subscriptions per channel (0 for unlimited)
    -mm,  --max_msgs                Max number of messages per channel (0 for unlimited)
    -mb,  --max_bytes              Max messages total size per channel (0 for unlimited)
    -ma,  --max_age            Max duration a message can be stored ("0s" for unlimited)
    -mi,  --max_inactivity     Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited)
    -ns,  --nats_server          Connect to this external NATS Server URL (embedded otherwise)
    -sc,  --stan_config          Streaming server configuration file
    -hbi, --hb_interval        Interval at which server sends heartbeat to a client
    -hbt, --hb_timeout         How long server waits for a heartbeat response
    -hbf, --hb_fail_count           Number of failed heartbeats before server closes the client connection
          --ft_group             Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore
    -sl,  --signal [=]      Send signal to nats-streaming-server process (stop, quit, reopen)
          --encrypt                Specify if server should use encryption at rest
          --encryption_cipher    Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES
          --encryption_key        Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead

Streaming Server Clustering Options:
    --clustered                    Run the server in a clustered configuration (default: false)
    --cluster_node_id            ID of the node within the cluster if there is no stored ID (default: random UUID)
    --cluster_bootstrap            Bootstrap the cluster if there is no existing state by electing self as leader (default: false)
    --cluster_peers              List of cluster peer node IDs to bootstrap cluster state.
    --cluster_log_path           Directory to store log replication data
    --cluster_log_cache_size        Number of log entries to cache in memory to reduce disk IO (default: 512)
    --cluster_log_snapshots         Number of log snapshots to retain (default: 2)
    --cluster_trailing_logs         Number of log entries to leave after a snapshot and compaction
    --cluster_sync                 Do a file sync after every write to the replication log and message store
    --cluster_raft_logging         Enable logging from the Raft library (disabled by default)

Streaming Server File Store Options:
    --file_compact_enabled         Enable file compaction
    --file_compact_frag             File fragmentation threshold for compaction
    --file_compact_interval         Minimum interval (in seconds) between file compactions
    --file_compact_min_size        Minimum file size for compaction
    --file_buffer_size             File buffer size (in bytes)
    --file_crc                     Enable file CRC-32 checksum
    --file_crc_poly                 Polynomial used to make the table used for CRC-32 checksum
    --file_sync                    Enable File.Sync on Flush
    --file_slice_max_msgs           Maximum number of messages per file slice (subject to channel limits)
    --file_slice_max_bytes         Maximum file slice size - including index file (subject to channel limits)
    --file_slice_max_age       Maximum file slice duration starting when the first message is stored (subject to channel limits)
    --file_slice_archive_script  Path to script to use if you want to archive a file slice being removed
    --file_fds_limit                Store will try to use no more file descriptors than this given limit
    --file_parallel_recovery        On startup, number of channels that can be recovered in parallel
    --file_truncate_bad_eof        Truncate files for which there is an unexpected EOF on recovery, dataloss may occur

Streaming Server SQL Store Options:
    --sql_driver             Name of the SQL Driver ("mysql" or "postgres")
    --sql_source             Datasource used when opening an SQL connection to the database
    --sql_no_caching           Enable/Disable caching for improved performance
    --sql_max_open_conns        Maximum number of opened connections to the database

Streaming Server TLS Options:
    -secure                    Use a TLS connection to the NATS server without
                                     verification; weaker than specifying certificates.
    -tls_client_key          Client key for the streaming server
    -tls_client_cert         Client certificate for the streaming server
    -tls_client_cacert       Client certificate CA for the streaming server

Streaming Server Logging Options:
    -SD, --stan_debug=         Enable STAN debugging output
    -SV, --stan_trace=         Trace the raw STAN protocol
    -SDV                             Debug and trace STAN
         --syslog_name               On Windows, when running several servers as a service, use this name for the event source
    (See additional NATS logging options below)

Embedded NATS Server Options:
    -a, --addr               Bind to host address (default: 0.0.0.0)
    -p, --port                  Use port for clients (default: 4222)
    -P, --pid                File to store PID
    -m, --http_port             Use port for http monitoring
    -ms,--https_port            Use port for https monitoring
    -c, --config             Configuration file

Logging Options:
    -l, --log                File to redirect log output
    -T, --logtime=             Timestamp log entries (default: true)
    -s, --syslog             Enable syslog as log method
    -r, --remote_syslog      Syslog server addr (udp://localhost:514)
    -D, --debug=               Enable debugging output
    -V, --trace=               Trace the raw protocol
    -DV                              Debug and trace

Authorization Options:
        --user               User required for connections
        --pass               Password required for connections
        --auth               Authorization token required for connections

TLS Options:
        --tls=                 Enable TLS, do not verify clients (default: false)
        --tlscert            Server certificate file
        --tlskey             Private key for server certificate
        --tlsverify=           Enable TLS, verify client certificates
        --tlscacert          Client certificate CA for verification

NATS Clustering Options:
        --routes        Routes to solicit and connect
        --cluster            Cluster URL for solicited routes

Common Options:
    -h, --help                       Show this message
    -v, --version                    Show version
        --help_tls                   TLS help.
源碼簡(jiǎn)單分析NATS Streaming 持久化

目前NATS Streaming支持以下4種持久化方式:

MEMORY

FILE

SQL

RAFT

其實(shí)看源碼可以知道:NATS Streaming的store基于接口實(shí)現(xiàn),很容易擴(kuò)展到更多的持久化方式。具體的接口如下:

// Store is the storage interface for NATS Streaming servers.
//
// If an implementation has a Store constructor with StoreLimits, it should be
// noted that the limits don"t apply to any state being recovered, for Store
// implementations supporting recovery.
//
type Store interface {
    // GetExclusiveLock is an advisory lock to prevent concurrent
    // access to the store from multiple instances.
    // This is not to protect individual API calls, instead, it
    // is meant to protect the store for the entire duration the
    // store is being used. This is why there is no `Unlock` API.
    // The lock should be released when the store is closed.
    //
    // If an exclusive lock can be immediately acquired (that is,
    // it should not block waiting for the lock to be acquired),
    // this call will return `true` with no error. Once a store
    // instance has acquired an exclusive lock, calling this
    // function has no effect and `true` with no error will again
    // be returned.
    //
    // If the lock cannot be acquired, this call will return
    // `false` with no error: the caller can try again later.
    //
    // If, however, the lock cannot be acquired due to a fatal
    // error, this call should return `false` and the error.
    //
    // It is important to note that the implementation should
    // make an effort to distinguish error conditions deemed
    // fatal (and therefore trying again would invariably result
    // in the same error) and those deemed transient, in which
    // case no error should be returned to indicate that the
    // caller could try later.
    //
    // Implementations that do not support exclusive locks should
    // return `false` and `ErrNotSupported`.
    GetExclusiveLock() (bool, error)

    // Init can be used to initialize the store with server"s information.
    Init(info *spb.ServerInfo) error

    // Name returns the name type of this store (e.g: MEMORY, FILESTORE, etc...).
    Name() string

    // Recover returns the recovered state.
    // Implementations that do not persist state and therefore cannot
    // recover from a previous run MUST return nil, not an error.
    // However, an error must be returned for implementations that are
    // attempting to recover the state but fail to do so.
    Recover() (*RecoveredState, error)

    // SetLimits sets limits for this store. The action is not expected
    // to be retroactive.
    // The store implementation should make a deep copy as to not change
    // the content of the structure passed by the caller.
    // This call may return an error due to limits validation errors.
    SetLimits(limits *StoreLimits) error

    // GetChannelLimits returns the limit for this channel. If the channel
    // does not exist, returns nil.
    GetChannelLimits(name string) *ChannelLimits

    // CreateChannel creates a Channel.
    // Implementations should return ErrAlreadyExists if the channel was
    // already created.
    // Limits defined for this channel in StoreLimits.PeChannel map, if present,
    // will apply. Otherwise, the global limits in StoreLimits will apply.
    CreateChannel(channel string) (*Channel, error)

    // DeleteChannel deletes a Channel.
    // Implementations should make sure that if no error is returned, the
    // channel would not be recovered after a restart, unless CreateChannel()
    // with the same channel is invoked.
    // If processing is expecting to be time consuming, work should be done
    // in the background as long as the above condition is guaranteed.
    // It is also acceptable for an implementation to have CreateChannel()
    // return an error if background deletion is still happening for a
    // channel of the same name.
    DeleteChannel(channel string) error

    // AddClient stores information about the client identified by `clientID`.
    AddClient(info *spb.ClientInfo) (*Client, error)

    // DeleteClient removes the client identified by `clientID` from the store.
    DeleteClient(clientID string) error

    // Close closes this store (including all MsgStore and SubStore).
    // If an exclusive lock was acquired, the lock shall be released.
    Close() error
}

官方也提供了mysql和pgsql兩種數(shù)據(jù)的支持:

postgres.db.sql

CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INTEGER DEFAULT 1, id VARCHAR(1024), proto BYTEA, version INTEGER, PRIMARY KEY (uniquerow));
CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id));
CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id));
CREATE INDEX Idx_ChannelsName ON Channels (name(256));
CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT, timestamp BIGINT, size INTEGER, data BYTEA, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq));
CREATE INDEX Idx_MsgsTimestamp ON Messages (timestamp);
CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT, lastsent BIGINT DEFAULT 0, proto BYTEA, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));
CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT, row BIGINT, seq BIGINT DEFAULT 0, lastsent BIGINT DEFAULT 0, pending BYTEA, acks BYTEA, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, row));
CREATE INDEX Idx_SubsPendingSeq ON SubsPending (seq);
CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT DEFAULT 0);

-- Updates for 0.10.0
ALTER TABLE Clients ADD proto BYTEA;

mysql.db.sql

CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INT DEFAULT 1, id VARCHAR(1024), proto BLOB, version INTEGER, PRIMARY KEY (uniquerow));
CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id(256)));
CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT UNSIGNED DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id), INDEX Idx_ChannelsName (name(256)));
CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT UNSIGNED, timestamp BIGINT, size INTEGER, data BLOB, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq), INDEX Idx_MsgsTimestamp (timestamp));
CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT UNSIGNED, lastsent BIGINT UNSIGNED DEFAULT 0, proto BLOB, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));
CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT UNSIGNED, `row` BIGINT UNSIGNED, seq BIGINT UNSIGNED DEFAULT 0, lastsent BIGINT UNSIGNED DEFAULT 0, pending BLOB, acks BLOB, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, `row`), INDEX Idx_SubsPendingSeq(seq));
CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT UNSIGNED DEFAULT 0);

# Updates for 0.10.0
ALTER TABLE Clients ADD proto BLOB;
總結(jié)

后續(xù)會(huì)詳細(xì)解讀一下代碼實(shí)現(xiàn)和一些集群部署。當(dāng)然肯定少不了如何部署高可用的集群在k8s當(dāng)中。

參閱文章:

NATS Streaming詳解

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/27690.html

相關(guān)文章

  • Spark Streaming學(xué)習(xí)筆記

    摘要:輸入和接收器輸入代表從某種流式數(shù)據(jù)源流入的數(shù)據(jù)流。文件數(shù)據(jù)流可以從任何兼容包括等的文件系統(tǒng),創(chuàng)建方式如下將監(jiān)視該目錄,并處理該目錄下任何新建的文件目前還不支持嵌套目錄。會(huì)被一個(gè)個(gè)依次推入隊(duì)列,而則會(huì)依次以數(shù)據(jù)流形式處理這些的數(shù)據(jù)。 特點(diǎn): Spark Streaming能夠?qū)崿F(xiàn)對(duì)實(shí)時(shí)數(shù)據(jù)流的流式處理,并具有很好的可擴(kuò)展性、高吞吐量和容錯(cuò)性。 Spark Streaming支持從多種數(shù)...

    陸斌 評(píng)論0 收藏0
  • Spark Streaming遇到問(wèn)題分析

    摘要:遇到問(wèn)題分析之后搞了個(gè)還沒(méi)仔細(xì)了解可參考的與的有區(qū)別及并發(fā)控制先看看的,與的這幾個(gè)概念。一個(gè)可以認(rèn)為就是會(huì)最終輸出一個(gè)結(jié)果的一條由組織而成的計(jì)算。在中,我們通過(guò)使用新極大地增強(qiáng)對(duì)狀態(tài)流處理的支持。 Spark Streaming遇到問(wèn)題分析 1、Spark2.0之后搞了個(gè)Structured Streaming 還沒(méi)仔細(xì)了解,可參考:https://github.com/lw-lin/...

    stormzhang 評(píng)論0 收藏0
  • TBSSQL 的那些事 | TiDB Hackathon 2018 優(yōu)秀項(xiàng)目分享

    摘要:當(dāng)我們正準(zhǔn)備做前期調(diào)研和設(shè)計(jì)的時(shí)候,主辦方把唐長(zhǎng)老拉去做現(xiàn)場(chǎng)導(dǎo)師,參賽規(guī)則規(guī)定導(dǎo)師不能下場(chǎng)比賽,囧,于是就這樣被被動(dòng)放了鴿子。川總早早來(lái)到現(xiàn)場(chǎng)。 本文作者是來(lái)自 TiBoys 隊(duì)的崔秋同學(xué),他們的項(xiàng)目 TBSSQL 在 TiDB Hackathon 2018 中獲得了一等獎(jiǎng)。TiDB Batch and Streaming SQL(簡(jiǎn)稱(chēng) TBSSQL)擴(kuò)展了 TiDB 的 SQL 引擎...

    KnewOne 評(píng)論0 收藏0
  • 阿里巴巴為什么選擇Apache Flink?

    摘要:從長(zhǎng)遠(yuǎn)來(lái)看,阿里決定用做一個(gè)統(tǒng)一的通用的大數(shù)據(jù)引擎作為未來(lái)的選型。在阿里的現(xiàn)狀基于在阿里巴巴搭建的平臺(tái)于年正式上線,并從阿里巴巴的搜索和推薦這兩大場(chǎng)景開(kāi)始實(shí)現(xiàn)。目前阿里巴巴所有的業(yè)務(wù),包括阿里巴巴所有子公司都采用了基于搭建的實(shí)時(shí)計(jì)算平臺(tái)。 本文主要整理自阿里巴巴計(jì)算平臺(tái)事業(yè)部資深技術(shù)專(zhuān)家莫問(wèn)在云棲大會(huì)的演講。 合抱之木,生于毫末 隨著人工智能時(shí)代的降臨,數(shù)據(jù)量的爆發(fā),在典型的大數(shù)據(jù)的業(yè)...

    CoderBear 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<